Привет, Хабр! Меня зовут Агаджанян Давид и ранее я опубликовал статью «простые highload паттерны на Go», в которой были рассмотрены простые подходы увеличения пропускной способности отдельно взятого экземпляра приложения без хардкора. Мне импонируют простые подходы, так как over engineering подходы требуются в исключительных проектах, и то чаще всего только в отдельно взятых подсистемах, в остальном можно воспользоваться давно известными лучшими практиками. Статья ориентирована на начинающих разработчиков и может служить отправной точкой для дальнейшего изучения. В этой статье подходы ориентированы на масштабирование приложения при верхнеуровневом рассмотрении. Подходы универсальны, приписка языка Go только потому, что сама реализация приложения на нем. Для наглядности, примеры будут упрощены, а чтобы было интереснее наблюдать, проведем замеры производительности. Ну и конечно же делитесь своим опытом масштабирования, буду рад взять на заметку.

Правильный замер производительности как отдельная наука, необходимо создать правильную среду и снизить погрешности до минимума, в этой статье будут использованы стандартные инструменты и в качестве выходных значений будут использоваться усредненные с десятков запусков

Stateless horizontal scaling

one instance
one instance

Есть приложение, которое крутится в бою и вполне себе успешно выполняет свою работу. Постепенно трафик растет, пользователи становятся ненасытнее и все активнее пользуются приложением. И вот настигает пик, при котором CPU/RAM физически уже не хватает и требуется принять меры. Решение простое и очень эффективное - создать реплики приложения

Пример: есть приложение, которое в реальном времени производит сложные CPU расчеты. Трафик приложения растет, CPU железки подходит к пределу, а само приложение и так уже оптимизировано насколько хватило компетенций команды разработки. Расти и покупать более дорогостоящее оборудование уже не целесообразно

Концепт: поставить несколько обычных железок и реплицировать на них приложение, поставив перед ними load balancer

multiple instances with load balancer
multiple instances with load balancer

Реализация: примеры кода ниже, но они также доступны в github

Попробуем провести замеры производительности одного сервиса и затем нескольких сервисов с балансировщиком нагрузки перед ними

Для простоты имитации ограничения железки и наглядности воспользуемся инструментом docker и docker-compose

Возможность ограничения по CPU в docker-compose была добавлена в версии "2.2". Подробнее про ограничения по ресурсам можно найти в этой документации

Смотрим пример

version: "3.9"
services:
  redis:
    image: redis:alpine
    deploy:
      resources:
        limits:
          cpus: '0.50' <- то, что нам нужно
          memory: 50M
        reservations:
          cpus: '0.25'
          memory: 20M

In this general example, the redis service is constrained to use no more than 50M of memory and 0.50 (50% of a single core) of available processing time (CPU), and has 20M of memory and 0.25 CPU time reserved (as always available to it).

Документация говорит, что 0.5 означает, что контейнер получит не более 50% доступного процессорного времени одного ядра. Для наглядности ограничим наше приложение 20%

Так как приложение на Go, необходим Dockerfile, с помощью которого оно будет заворачиваться в контейнер

# возьмем один из самых свежих образов Go
FROM golang:1.19.0-alpine3.16

# alpine образ беден на стандартные инструменты, добавим их отдельной командой
RUN apk update && apk upgrade && \
    apk add --no-cache bash git openssh

# задаем директорию приложения в контейнере
WORKDIR /app

# скачиваем в контейнер зависимости приложения
COPY go.mod ./
RUN go mod download

# копируем файлы текущей директории в контейнер
COPY . .

# билдим приложение
RUN go build -o main .

# так как приложение читает 8890 порт, откроем его внешнему миру
EXPOSE 8890

# команда запуска приложения в контейнере
CMD ["./main"]
Приложение, которое выполняет CPU работу
package main

import (
	"encoding/json"
	"math/rand"
	"net/http"
)

func main() {
	http.HandleFunc("/handle", func(writer http.ResponseWriter, request *http.Request) {
		// cpu intensive work
		for i := 0; i <= 1000; i++ {
			_, _ = json.Marshal(randSeq(10))
		}

		writer.Header().Add("Content-Type", "application/json")
		writer.Write([]byte("done!"))
	})

	_ = http.ListenAndServe(":8890", nil)
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// random string generator
func randSeq(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}

Теперь сформируем docker-compose
version: '3.9'

services:
  app:
    build:
      context: ../
      dockerfile: Dockerfile
    ports:
      - "8870:8890"
    restart: unless-stopped
    networks:
      - backend
    deploy:
      resources:
        limits:
          cpus: '0.2'
          memory: 500M

networks:
  backend:

Так как docker-compose.yml находится по пути ./one-instance/docker-compose.yml, а Dockerfile на уровень выше, то необходимо задать контекст ../

Запустим приложение командой

docker-compose up --build --force-recreate
В консоли будет примерно следующий лог
davidagadzhanyan@MacBook-Pro one-instance % docker-compose up --build --force-recreate
WARNING: Found orphan containers (one-instance_app1_1) for this project. If you removed or renamed this service in your compose file, you can run this command with the --remove-orphans flag to clean it up.
Building app
[+] Building 2.6s (12/12) FINISHED                                                                                                                                                                        
 => [internal] load build definition from Dockerfile                                                                                                                                                 0.0s
 => => transferring dockerfile: 37B                                                                                                                                                                  0.0s
 => [internal] load .dockerignore                                                                                                                                                                    0.0s
 => => transferring context: 2B                                                                                                                                                                      0.0s
 => [internal] load metadata for docker.io/library/golang:1.19.0-alpine3.16                                                                                                                          1.5s
 => [1/7] FROM docker.io/library/golang:1.19.0-alpine3.16@sha256:0eb08c89ab1b0c638a9fe2780f7ae3ab18f6ecda2c76b908e09eb8073912045d                                                                    0.0s
 => [internal] load build context                                                                                                                                                                    0.0s
 => => transferring context: 1.38kB                                                                                                                                                                  0.0s
 => CACHED [2/7] RUN apk update && apk upgrade &&     apk add --no-cache bash git openssh                                                                                                            0.0s
 => CACHED [3/7] WORKDIR /app                                                                                                                                                                        0.0s
 => CACHED [4/7] COPY go.mod ./                                                                                                                                                                      0.0s
 => CACHED [5/7] RUN go mod download                                                                                                                                                                 0.0s
 => [6/7] COPY . .                                                                                                                                                                                   0.0s
 => [7/7] RUN go build -o main .                                                                                                                                                                     0.8s
 => exporting to image                                                                                                                                                                               0.1s
 => => exporting layers                                                                                                                                                                              0.1s
 => => writing image sha256:0166bc8bb315e491a69964bc68ff7a125dc0a8dd9c5865c3970fb2bc428a5ba0                                                                                                         0.0s
 => => naming to docker.io/library/one-instance_app                                                                                                                                                  0.0s

Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
Recreating one-instance_app_1 ... done
Attaching to one-instance_app_1

Нас интересуют две последние строчки, которые говорят о том, что приложение успешно запущено.

Теперь проведем нагрузочное тестирование с помощью утилиты wrk. Для чистоты проведем его в среде докера. Находим образ с этой утилитой в хабе.

Для того чтобы запущенный контейнер wrk смог достучаться до приложения, при запуске присоединим его к сети backend.

Докер имеет свои правила формирования названия сети. Найти точное название сети можно выполнив команду.

docker network ls

Результатом команды будет список созданных докером сетей.

NETWORK ID     NAME                            DRIVER    SCOPE
657a365c5cdd   one-instance_backend            bridge    local

Видно название сети one-instance_backend

Отлично, теперь проведем нагрузочное тестирование при одном потоке и одном соединении повторим при трех.

docker run --network=one-instance_backend \
	--rm skandyla/wrk -t1 -c1 -d5s http://app:8890/handle
  
docker run --network=one-instance_backend \
	--rm skandyla/wrk -t3 -c3 -d5s http://app:8890/handle 

Видим следующие результаты

Running 5s test @ http://app:8890/handle
  1 threads and 1 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    22.55ms   28.32ms  91.44ms   78.73%
    Req/Sec   169.56     47.70   410.00     78.00%
  854 requests in 5.05s, 93.41KB read
Requests/sec:    169.08
Transfer/sec:     18.49KB

Running 5s test @ http://app:8890/handle
  3 threads and 3 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    45.85ms   42.16ms 195.06ms   31.70%
    Req/Sec    25.06     15.04    99.00     70.75%
  377 requests in 5.06s, 41.23KB read
Requests/sec:     74.56
Transfer/sec:      8.16KB

Теперь создадим три реплики приложения, поставим перед ними nginx для балансировки нагрузки и проведем аналогичное нагрузочное тестирование.

Сформируем конфиг nginx и используем дефолтный способ балансировки round robin (поочередная отправка запросов серверам).

Подробнее про другие способы балансировки можно почитать в документации

upstream app {
    server app1:8890;
    server app2:8890;
    server app3:8890;
}

server {
    listen 8891;
    location / {
        proxy_pass http://app;
    }
}
Создадим новый docker-compose.yml
version: '3.9'

services:
  app1:
    build:
      context: ..
      dockerfile: Dockerfile
    ports:
      - "8881:8890"
    restart: unless-stopped
    networks:
      - backend
    deploy:
      resources:
        limits:
          cpus: '0.2'
          memory: 500M

  app2:
    build:
      context: ..
      dockerfile: Dockerfile
    ports:
      - "8882:8890"
    restart: unless-stopped
    networks:
      - backend
    deploy:
      resources:
        limits:
          cpus: '0.2'
          memory: 500M

  app3:
    build:
      context: ..
      dockerfile: Dockerfile
    ports:
      - "8883:8890"
    restart: unless-stopped
    networks:
      - backend
    deploy:
      resources:
        limits:
          cpus: '0.2'
          memory: 500M

  ngnix-server:
    image: nginx:latest
    ports:
      - "8891:8891"
    volumes:
      - /Users/davidagadzhanyan/go/src/highload-patterns/scale-stateless/multiple-with-lb/nginx.conf:/etc/nginx/conf.d/default.conf
    networks:
      - backend

networks:
  backend:

networks:
  backend:

Обратите внимание, что для монтирования одного файла, необходимо указывать абсолютный путь к нему

Проведем нагрузочное тестирование еще раз командами

docker run --network=multiple-with-lb_backend 
	\ --rm skandyla/wrk -t1 -c1 -d5s http://ngnix-server:8891/handle
  
docker run --network=multiple-with-lb_backend 
	\ --rm skandyla/wrk -t3 -c3 -d5s http://ngnix-server:8891/handle

Видим следующий лог

Running 5s test @ http://ngnix-server:8891/handle
  1 threads and 1 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     9.09ms   13.99ms  68.66ms   82.25%
    Req/Sec   451.82     93.72     1.02k    92.00%
  2251 requests in 5.00s, 347.31KB read
Requests/sec:    450.16
Transfer/sec:     69.46KB

Running 5s test @ http://ngnix-server:8891/handle
  3 threads and 3 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    22.41ms   26.86ms  95.08ms   78.73%
    Req/Sec   109.03     33.69   250.00     76.00%
  1631 requests in 5.01s, 251.66KB read
Requests/sec:    325.80
Transfer/sec:     50.27KB

Соберем воедино и сравним результаты

RPS

Latency

one instance (1t, 1c)

169

22.55ms

one instance (3t, 3c)

74

45.85ms

three instances + lb (1t, 1c)

450

9.09ms

three instances + lb (3t, 3c)

325

22.41ms

Как ожидалось, путем горизонтального масштабирования удалось кратно увеличить как пропускную способность RPS, так и уменьшить latency.

Подробнее про балансировку можно почитать system-design-primer

Плюсы

  • Один из плюсов, о котором ранее не говорилось, теперь сервис становится надежнее, так как в случае падения одной из реплик, есть другие, которые будут продолжать выполнять полезную работу. Теперь сервис не является единой точкой отказа.

  • Вторым плюсом является то, что поверх контейнеров можно создать механизм, который мог бы сам автоматически добавлять/удалять реплики в зависимости от текущей нагрузки. Этим и другими полезными механизмами оркестрации занимается kubernetes и другие его соотечественники вроде Docker Swarm и другие.

Минусы

  • Подходит только для stateless приложений, которые не хранят состояния пользователей, то есть запрос может быть обработан любой репликой.

  • Избавившись от единой точки отказа в виде одного экземпляра приложения, появляется единая точка отказа в виде балансировщика nginx, но никто не мешает и его отреплицировать и использовать более высокоуровневую балансировку.

Sync to async

sync processing
sync processing

Есть приложение, в котором активно растет трафик и помимо основных, оно выполняет еще и вспомогательные функции, которые с ростом трафика начинают влиять на пользовательский опыт. Так почему не выполнять вспомогательные функции асинхронно?

Пример: есть приложение, в котором фиксируются пользовательские события кликов и любых других действий. Количество необходимых для фиксирования событий постепенно растет, как и количество пользователей. В какой-то момент эта механика начинает сказываться на времени выполнения запроса пользователя

Концепт: вынести фиксацию событий в асинхронный режим через отправку сообщений в шину данных и затем разгружать эту очередь отдельным сервисом

async processing
async processing

Реализация: примеры кода ниже, но они также доступны в github

Сымитируем прямой поход в сервис логов, проведем нагрузочное тестирование, затем добавим обработку через шину данных и сравним результаты

Код приложения, которое будет выполнять CPU работу и отправлять запрос в сервис логов
package main

import (
	"bytes"
	"encoding/json"
	"io"
	"math/rand"
	"net/http"
)

func main() {
	http.HandleFunc("/handle", func(writer http.ResponseWriter, request *http.Request) {
		// cpu intensive work
		for i := 0; i <= 1000; i++ {
			_, _ = json.Marshal(randSeq(10))
		}

		// send log
		resp, err := http.DefaultClient.Post(
			"http://logs:8890/sendEvent",
			"application/json",
			bytes.NewReader([]byte(randSeq(100))),
		)
		if err != nil {
			writer.Header().Add("Content-Type", "application/json")
			writer.Write([]byte("failed!" + err.Error()))
			return
		}

		body, err := io.ReadAll(resp.Body)
		if err != nil {
			writer.Header().Add("Content-Type", "application/json")
			writer.Write([]byte("failed!" + err.Error()))
			return
		}

		writer.Header().Add("Content-Type", "application/json")
		writer.Write([]byte("done!" + string(body)))
	})

	_ = http.ListenAndServe(":8890", nil)
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// Random string generator
func randSeq(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}

Код приложения, которое для имитации CPU работы будет конвертировать строку в json
package main

import (
	"encoding/json"
	"math/rand"
	"net/http"
)

func main() {
	http.HandleFunc("/sendEvent", func(writer http.ResponseWriter, request *http.Request) {
		// cpu intensive work
		for i := 0; i <= 1000; i++ {
			_, _ = json.Marshal(randSeq(50))
		}

		writer.Header().Add("Content-Type", "application/json")
		writer.Write([]byte("event is saved!"))
	})

	_ = http.ListenAndServe(":8890", nil)
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// Random string generator
func randSeq(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}

Соберем docker-compose.yml
version: '3.9'

services:
  app:
    build:
      context: ./app
      dockerfile: ../../Dockerfile
    ports:
      - "8861:8890"
    restart: unless-stopped
    networks:
      - backend
    deploy:
      resources:
        limits:
          cpus: '0.2'
          memory: 500M

  logs:
    build:
      context: ./log
      dockerfile: ../../Dockerfile
    ports:
      - "8862:8890"
    restart: unless-stopped
    networks:
      - backend
    deploy:
      resources:
        limits:
          cpus: '0.2'
          memory: 500M

networks:
  backend:

Запустим приложения командой

docker-compose up --build --force-recreate
Увидим примерно такой лог
davidagadzhanyan@MacBook-Pro sync % docker-compose up --build --force-recreate
Building app
[+] Building 2.8s (12/12) FINISHED                                                                                                                                                                        
 => [internal] load build definition from Dockerfile                                                                                                                                                 0.0s
 => => transferring dockerfile: 37B                                                                                                                                                                  0.0s
 => [internal] load .dockerignore                                                                                                                                                                    0.0s
 => => transferring context: 2B                                                                                                                                                                      0.0s
 => [internal] load metadata for docker.io/library/golang:1.19.0-alpine3.16                                                                                                                          1.6s
 => [1/7] FROM docker.io/library/golang:1.19.0-alpine3.16@sha256:0eb08c89ab1b0c638a9fe2780f7ae3ab18f6ecda2c76b908e09eb8073912045d                                                                    0.0s
 => [internal] load build context                                                                                                                                                                    0.0s
 => => transferring context: 1.86kB                                                                                                                                                                  0.0s
 => CACHED [2/7] RUN apk update && apk upgrade &&     apk add --no-cache bash git openssh                                                                                                            0.0s
 => CACHED [3/7] WORKDIR /app                                                                                                                                                                        0.0s
 => [4/7] COPY go.mod ./                                                                                                                                                                             0.0s
 => [5/7] RUN go mod download                                                                                                                                                                        0.2s
 => [6/7] COPY . .                                                                                                                                                                                   0.0s
 => [7/7] RUN go build -o main .                                                                                                                                                                     0.7s
 => exporting to image                                                                                                                                                                               0.1s
 => => exporting layers                                                                                                                                                                              0.1s
 => => writing image sha256:fc18d50b8af7c9775b9911c32d4c230fcd78025f0f5505e7ddab8d464d77fef9                                                                                                         0.0s
 => => naming to docker.io/library/sync_app                                                                                                                                                          0.0s

Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
Building logs
[+] Building 1.4s (12/12) FINISHED                                                                                                                                                                        
 => [internal] load build definition from Dockerfile                                                                                                                                                 0.0s
 => => transferring dockerfile: 37B                                                                                                                                                                  0.0s
 => [internal] load .dockerignore                                                                                                                                                                    0.0s
 => => transferring context: 2B                                                                                                                                                                      0.0s
 => [internal] load metadata for docker.io/library/golang:1.19.0-alpine3.16                                                                                                                          0.3s
 => [internal] load build context                                                                                                                                                                    0.0s
 => => transferring context: 140B                                                                                                                                                                    0.0s
 => [1/7] FROM docker.io/library/golang:1.19.0-alpine3.16@sha256:0eb08c89ab1b0c638a9fe2780f7ae3ab18f6ecda2c76b908e09eb8073912045d                                                                    0.0s
 => CACHED [2/7] RUN apk update && apk upgrade &&     apk add --no-cache bash git openssh                                                                                                            0.0s
 => CACHED [3/7] WORKDIR /app                                                                                                                                                                        0.0s
 => [4/7] COPY go.mod ./                                                                                                                                                                             0.0s
 => [5/7] RUN go mod download                                                                                                                                                                        0.2s
 => [6/7] COPY . .                                                                                                                                                                                   0.0s
 => [7/7] RUN go build -o main .                                                                                                                                                                     0.7s
 => exporting to image                                                                                                                                                                               0.1s
 => => exporting layers                                                                                                                                                                              0.1s
 => => writing image sha256:9e13068dcf77077505181c536b68b6e182e7df7f4abff2eb52f20f6e23981d8c                                                                                                         0.0s
 => => naming to docker.io/library/sync_logs                                                                                                                                                         0.0s

Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
Recreating sync_logs_1 ... done
Recreating sync_app_1  ... done
Attaching to sync_logs_1, sync_app_1

Запустим нагрузочное тестирование командами

docker run --network=sync_backend \
	--rm skandyla/wrk -t1 -c1 -d5s http://app:8890/handle
  
docker run --network=sync_backend \
	--rm skandyla/wrk -t3 -c3 -d5s http://app:8890/handle

И увидим результаты

Running 5s test @ http://app:8890/handle
  1 threads and 1 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    28.20ms   29.85ms  90.45ms   77.06%
    Req/Sec    56.42     13.52   117.00     66.00%
  283 requests in 5.02s, 35.38KB read
Requests/sec:     56.35
Transfer/sec:      7.04KB

Running 5s test @ http://app:8890/handle
  3 threads and 3 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   105.07ms   64.37ms 204.37ms   54.17%
    Req/Sec    10.07      4.48    20.00     65.49%
  144 requests in 5.05s, 18.00KB read
Requests/sec:     28.53
Transfer/sec:      3.57KB

Видим, что пропускная способность 56rps и 28rps при задержке 28ms и 105ms.

Теперь вынесем обработку сервиса логов в асинхронный режим. В качестве шины данных используем Apache Kafka. Основное приложение будет писать сообщения в кафку, а сервис логов будет вычитывать эти сообщения.

В этой статье мы не будем останавливаться на технических деталях Apache Kafka, почитать про нее подробнее можно в официальной документации

Также вместе с Apache Kafka часто используется Apache Zookeeper для мониторинга/синхронизации брокеров/топиков/сообщений

Добавим оба этих инструмента в docker-compose.yml
version: '3.9'

services:
  app:
    build:
      context: ./app
      dockerfile: ../../Dockerfile
    ports:
      - "8851:8890"
    restart: always
    networks:
      - backend
    depends_on:
      - zookeeper
      - broker
    deploy:
      resources:
        limits:
          cpus: '0.2'
          memory: 500M

  logs:
    build:
      context: ./log
      dockerfile: ../../Dockerfile
    ports:
      - "8852:8890"
    restart: always
    networks:
      - backend
    depends_on:
      - zookeeper
      - broker
    deploy:
      resources:
        limits:
          cpus: '0.2'
          memory: 500M

  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    restart: always
    networks:
      - backend
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    restart: always
    ports:
      - "9092:9092"
    networks:
      - backend
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

networks:
  backend:

Основное приложение, которое будет писать сообщения в кафку
package main

import (
	"context"
	"encoding/json"
	"log"
	"math/rand"
	"net/http"
	"os"

	"github.com/segmentio/kafka-go"
)

func main() {
	log.SetOutput(os.Stdout)

	// kafka topic
	topic := "my-topic"
	partition := 0

	// create kafka connection
	conn, err := kafka.DialLeader(context.Background(), "tcp", "broker:29092", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	http.HandleFunc("/handle", func(writer http.ResponseWriter, request *http.Request) {
		// cpu intensive work
		for i := 0; i <= 1000; i++ {
			_, _ = json.Marshal(randSeq(10))
		}

		// produce event to kafka
		_, err := conn.WriteMessages(
			kafka.Message{Value: []byte(randSeq(100))},
		)
		if err != nil {
			log.Println(err)
		}

		writer.Header().Add("Content-Type", "application/json")
		writer.Write([]byte("done!"))
	})

	_ = http.ListenAndServe(":8890", nil)
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// Random string generator
func randSeq(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}

Приложение, которое будет вычитывать сообщения из кафки
package main

import (
	"context"
	"encoding/json"
	"log"
	"math/rand"
	"os"

	"github.com/segmentio/kafka-go"
)

func main() {
	log.SetOutput(os.Stdout)

	// kafka topic
	topic := "my-topic"
	partition := 0

	// create kafka connection
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{"broker:29092"},
		Topic:     topic,
		Partition: partition,
	})

	for {
		// read message from kafka
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			log.Println(err)
			break
		}

		// cpu intensive work
		for i := 0; i <= 1000; i++ {
			_, _ = json.Marshal(randSeq(50))
		}

		log.Print(m.Offset, string(m.Key), string(m.Value))
	}
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// Random string generator
func randSeq(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}

Запустим нагрузочное тестирование командами

docker run --network=async_backend \
	--rm skandyla/wrk -t1 -c1 -d5s http://app:8890/handle

docker run --network=async_backend \
	--rm skandyla/wrk -t3 -c3 -d5s http://app:8890/handle

Видим следующий лог

Running 5s test @ http://app:8890/handle
  1 threads and 1 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    21.24ms   26.03ms  89.19ms   78.94%
    Req/Sec   121.16     36.59   316.00     86.00%
  610 requests in 5.05s, 66.72KB read
Requests/sec:    120.80
Transfer/sec:     13.21KB

Running 5s test @ http://app:8890/handle
  3 threads and 3 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    40.31ms   37.91ms 103.10ms   68.19%
    Req/Sec    29.21     13.37    90.00     83.33%
  441 requests in 5.03s, 48.23KB read
Requests/sec:     87.66
Transfer/sec:      9.59KB

Соберем воедино результаты и сравним

RPS

Latency

sync (1t, 1c)

56

28.20ms

sync (3t, 3c)

28

105.07ms

async (1t, 1c)

120

21.24ms

async (3t, 3c)

87

40.31ms

Видим, что асинхронная обработка увеличила RPS и при этом сократила latency, так как теперь не нужно ждать выполнения запроса внешним сервисом.

Подробнее про асинхронную обработку можно почитать system-design-primer

Плюсы

  • Очевидный плюс в том, что теперь сложная работа выполняется асинхронно, тем самым увеличивается пропускная способность приложения

  • Можно в зависимости от накопленного количества сообщений в очереди добавлять/удалять реплики приложения-потребителя

Минусы

  • Для асинхронного выполнения подойдут тяжелые операции, не требующие мгновенной реакции

  • Появляется еще одна точка отказа в виде шины данных, которую необходимо поддерживать

SQL to NoSQL

(no) sql
(no) sql

Есть приложение, в котором есть одно хранилище под все и, как правило, это какая-нибудь реляционная база данных. Уезжать из него очень не хочется, но пользовательский трафик растет с такой скоростью, что поддержание одного такого хранилища обходится дорого. Что же делать?

Пример: допустим есть приложение и в качестве хранилища данных используется postgres. Количество пользовательских событий/данных/логов стремительно растет. Таблица с этими данными разбухает и запись в нее замедляется, как и чтение из нее. Сначала мы дробим данные на таблицы по месяцам вроде activities_01072022, activities_01082022, но и этот подход нас перестает устраивать, так как выгрузить полную историю пользователей требует большого количества джойнов по таблицам. Впереди ждет еще больше данных, ожидается, что сервис по праву будет как write-heavy, так и read-heavy. Вынесем эти события в эффективное для таких задач nosql хранилище данных.

Концепт: как большинство крупных систем, данные делятся примерно на два типа. Первые - это те, которые критически важно держать в согласованности, любые операции над ними требуют ACID свойств базы данных. Пример таких данных - финансовые операции/транзакции/бонусные счета. К таким базам относятся mysql, postgres и другие. С другой стороны, существуют nosql базы данных, которые за счет ограничений дают эффективность при выполнении специальных типов задач. Например, redis хорошо подходит для кеширования данных, так как они находятся в RAM и за счет этого он имеет высокую пропускную способность с низким latency. Не будем зацикливаться на отдельных хранилищах, как и в программировании в целом, хранилища также имеют свою кухню со своими блюдами, где все зависит от потребностей.

Реализация: примеры кода ниже, но они также доступны в github.

В качестве примера возьмем приложение, которое будет на запрос пользователя генерировать данные и записывать их в хранилище, В качестве демонстрации разницы в пропускной способности возьмем коробочные postgres и cassandra.

Go приложение с postgres
package main

import (
	"database/sql"
	"encoding/json"
	"log"
	"math/rand"
	"net/http"
	"os"
	"time"

	"github.com/google/uuid"
	_ "github.com/lib/pq"
)

func main() {
	log.SetOutput(os.Stdout)

	connStr := "host=postgres user=postgres password=postgres dbname=postgres sslmode=disable"
	db, err := sql.Open("postgres", connStr)
	if err != nil {
		log.Fatal(err)
	}

	http.HandleFunc("/handle", func(writer http.ResponseWriter, request *http.Request) {
		uid := uuid.New().String()
		blob, _ := json.Marshal(randSeq(100))

		err := db.QueryRowContext(
			request.Context(),
			`INSERT INTO activities(id, user_id, timestamp, data) VALUES($1, $2, $3, $4)`,
			uid,
			uuid.New().String(),
			time.Now(),
			string(blob),
		).Err()
		if err != nil {
			writer.Header().Add("Content-Type", "application/json")
			writer.Write([]byte("failed!" + err.Error()))
			return
		}

		writer.Header().Add("Content-Type", "application/json")
		writer.Write([]byte("saved! " + uid))
	})

	_ = http.ListenAndServe(":8890", nil)
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// Random string generator
func randSeq(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}

Сформированный docker-compose.yml
version: '3.9'

services:
  app:
    build:
      context: .
      dockerfile: ../Dockerfile
    ports:
      - "8831:8890"
    restart: always
    networks:
      - backend
    depends_on:
      - postgres
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 2000M

  postgres:
    image: postgres:14.0
    volumes:
      - /Users/davidagadzhanyan/go/src/highload-patterns/sql-to-nosql/sql/schema.sql:/docker-entrypoint-initdb.d/10-init.sql
    networks:
      - backend
    environment:
      POSTGRES_DB: "postgres"
      POSTGRES_USER: "postgres"
      POSTGRES_PASSWORD: "postgres"
    ports:
      - "5432:5432"
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 2000M

networks:
  backend:

Таблица, в которую будем писать данные

create table if not exists activities
(
    id varchar primary key,
    user_id varchar,
    timestamp timestamp,
    data jsonb
);
Аналогичное приложение с cassandra
package main

import (
	"encoding/json"
	"log"
	"math/rand"
	"net/http"
	"os"
	"time"

	"github.com/gocql/gocql"
)

func main() {
	log.SetOutput(os.Stdout)

	cluster := gocql.NewCluster("cassandra")
	cluster.Keyspace = "app"
	cluster.Consistency = gocql.Quorum
	cluster.Authenticator = gocql.PasswordAuthenticator{
		Username: "cassandra",
		Password: "cassandra",
	}

	session, err := cluster.CreateSession()
	if err != nil {
		log.Fatal(err)
	}
	defer session.Close()

	http.HandleFunc("/handle", func(writer http.ResponseWriter, request *http.Request) {
		uid := gocql.UUIDFromTime(time.Now())
		blob, _ := json.Marshal(randSeq(100))
		if err := session.Query(
			`INSERT INTO activities (id, user_id, timestamp, data) VALUES (?, ?, ?, ?)`,
			uid,
			gocql.UUIDFromTime(time.Now()),
			time.Now().Unix(),
			blob,

		).Exec(); err != nil {
			writer.Header().Add("Content-Type", "application/json")
			writer.Write([]byte("failed!" + err.Error()))
			return
		}

		writer.Header().Add("Content-Type", "application/json")
		writer.Write([]byte("saved! " + uid.String()))
	})

	_ = http.ListenAndServe(":8890", nil)
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// Random string generator
func randSeq(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}

Сформированный docker-compose.yml
version: '3.9'

services:
  app:
    build:
      context: .
      dockerfile: ../Dockerfile
    ports:
      - "8841:8890"
    restart: always
    networks:
      - backend
    depends_on:
      - cassandra
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 1000M

  cassandra:
    image: docker.io/bitnami/cassandra:4.0
    networks:
      - backend
    environment:
      - CASSANDRA_SEEDS=cassandra
      - CASSANDRA_CLUSTER_NAME=cassandra-cluster
      - MAX_HEAP_SIZE=1000M
      - HEAP_NEWSIZE=1000M
      - CASSANDRA_PASSWORD_SEEDER=yes
      - CASSANDRA_PASSWORD=cassandra
    ports:
      - 9042:9042
    volumes:
      - cassandra_data:/bitnami
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 1000M

  #from netflix repo: https://github.com/Netflix/osstracker/blob/master/docker-compose.yml#L67
  cassandra-load-keyspace:
    container_name: cassandra-load-keyspace
    image: cassandra:4.0
    networks:
      - backend
    depends_on:
      - cassandra
    volumes:
      - /Users/davidagadzhanyan/go/src/highload-patterns/sql-to-nosql/nosql/schema.cql:/schema.cql
    command: /bin/bash -c "sleep 10 && echo loading cassandra keyspace && cqlsh cassandra -u cassandra -p cassandra -f /schema.cql"
    deploy:
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
        window: 60s

networks:
  backend:

volumes:
  cassandra_data:
    driver: local

Таблица, в которую будем писать данные

DROP KEYSPACE IF EXISTS app;

CREATE KEYSPACE app WITH replication = {
    'class': 'SimpleStrategy',
    'replication_factor': 1
    };

USE app;

CREATE TABLE IF NOT EXISTS activities
(
    id        uuid,
    user_id   uuid,
    timestamp timestamp,
    data      blob,
    PRIMARY KEY ((id))
) WITH comment = 'Table with all activities';

Проведем нагрузочное тестирование postgres командами

docker run --network=sql_backend \
	--rm skandyla/wrk -t1 -c1 -d5s http://app:8890/handle
  
docker run --network=sql_backend \ 
	--rm skandyla/wrk -t12 -c12 -d5s http://app:8890/handle

Видим результат

Running 5s test @ http://app:8890/handle
  1 threads and 1 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     2.50ms    2.91ms  31.13ms   88.86%
    Req/Sec   524.24    143.19   760.00     80.00%
  2611 requests in 5.01s, 385.02KB read
Requests/sec:    521.43
Transfer/sec:     76.89KB

Running 5s test @ http://app:8890/handle
  12 threads and 12 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    42.74ms   40.28ms 199.19ms   78.80%
    Req/Sec    27.36     16.36    90.00     56.57%
  1623 requests in 5.02s, 239.33KB read
Requests/sec:    323.08
Transfer/sec:     47.64KB

Проведем нагрузочное тестирование cassandra командами

docker run --network=nosql_backend \
	--rm skandyla/wrk -t1 -c1 -d5s http://app:8890/handle 

docker run --network=nosql_backend \
	--rm skandyla/wrk -t12 -c12 -d5s http://app:8890/handle

Видим результат

Running 5s test @ http://app:8890/handle
  1 threads and 1 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     2.81ms  806.39us  14.13ms   91.44%
    Req/Sec   357.46     53.13   505.00     82.00%
  1782 requests in 5.01s, 262.78KB read
  Socket errors: connect 0, read 0, write 0, timeout 1
Requests/sec:    355.75
Transfer/sec:     52.46KB

Running 5s test @ http://app:8890/handle
  12 threads and 12 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    18.74ms   22.30ms 177.19ms   80.58%
    Req/Sec   101.73     30.50   222.00     74.12%
  6103 requests in 5.03s, 0.88MB read
Requests/sec:   1213.30
Transfer/sec:    178.91KB

Соберем воедино и сравним результаты

RPS

Latency

postgres (1t, 1c)

521

2.50ms

postgres (12t, 12c)

323

42.74ms

cassandra (1t, 1c)

355

2.81ms

cassandra (12t, 12c)

1213

18.74ms

По результатам видим, что хоть и postgres выигрывает у cassandra на 1 потоке и 1 соединении, на более высоких уже сильно уступает.

Конечно, пример лишь для наглядности, в действительности оба этих хранилища по своему интересны и имеют в корне разные подходы к хранению данных.

Например, postgres - реляционная БД и хранит данных в виде строк, с ACID свойствами.

Cassandra же относится к семейству колоночных, эффективно сжимает тонны однотипных данных. Она отлично подходит если вы пишете тоннами данные (можно почитать например здесь каким образом удается добиться высокой пропускной способности при записи). Существует переписанный на C++ аналог ScyllaDB, говорят она еще быстрее

Postgres имеет инструменты для шардирования и master-master репликации, но тем не менее на поддержание согласованности уходит много дополнительных ресурсы и это сказывается на работе всего кластера. По теореме CAP такая БД относится к CA, у нее отсутствует эффективность работы при расщеплении данных по разным нодам.

Cassandra заточена под работу в режиме кластера и за счет consisting hashing + виртуальных нод практически безболезненно переживает добавление/удаление нод. Любая реплика может инициировать запись. По теореме CAP такие БД относятся к AP, то есть данные на нодах могут быть несогласованны.

Итог

Мы по верхам прошлись по трем подходам к масштабированию приложений. Каждую из этих тем конечно же можно раскрыть гораздо глубже. Надеюсь, что это статья была полезна начинающим инженерам как отправная точка. Буду признателен любым конструктивным замечаниям. Спасибо!

Комментарии (0)