Так что, раз все работодатели требуют, то нужно эти технологии изучить. Но начинать с прочтения всей документации от начала и до конца не очень интересно. На мой взгляд, продуктивнее прочитать введение, сделать рабочий прототип, поправить ошибки, столкнуться с проблемами, решить их. И вот после всего этого уже с пониманием читать документацию, или даже отдельную книжку.
Тех, кому интересно в короткий срок познакомиться с базовыми возможностями указанных продуктов, прошу читать дальше.
Учебно-тренировочная программа будет заниматься факторизацией чисел. Она будет состоять из генератора больших чисел, процессора чисел, очереди, колоночного хранилища и веб-сервера.
По ходу разработки будут применены следующие шаблоны проектирования:
Архитектура системы будет выглядеть так:
На картинке овалом обозначен шаблон проектирования «конвейер». Остановлюсь на нём подробнее.
Шаблон «конвейер» предполагает, что информация поступает в виде потока и обрабатывается этапами. Обычно существует некоторый генератор (источник информации) и один или несколько процессоров (обработчиков информации). В данном случае генератором будет программа на Go, помещающая в очередь случайные большие числа. А обработчиком (единственным) будет программа, забирающая данные из очереди, и проводящая факторизацию. На чистом Go этот шаблон довольно легко реализуется с помощью каналов (chan). Выше есть ссылка на мой Гитхаб с примером. Здесь же роль каналов будет исполнять очередь сообщений.
Шаблоны Fan-In — Fan-Out обычно используются вместе и применительно к Go означают распараллеливание вычислений с помощью горутин с последующим сведением результатов и передаче их, например, далее по конвейеру. Ссылка на пример также приведёна выше. Опять же, канал заменён на очередь, горутины остались на месте.
Теперь пару слов об Apache Kafka. Kafka — это система управления сообщениями, обладающая отличными средствами кластеризации, использующая транзакционный журнал (точь в точь как в РСУБД) для хранения сообщений, и поддерживающая одновременно и модель очереди и модель издатель/подписчик. Последнее достигается за счёт групп получателей сообщений. Каждое сообщение получает только один член группы (параллельная обработка), но при этом сообщение будет доставлено по одному разу в каждую группу. Таких групп, как и получателей внутри каждой группы, может быть много.
Для работы с Kafka я буду использовать пакет «github.com/segmentio/kafka-go».
Redis же представляет собой колоночную БД типа «ключ-значение» в памяти, поддерживающую возможность постоянного хранения данных. Основной тип данных для ключей и значений — строки, но есть и некоторые другие. Redis считается одной из самых быстрых (или самой) БД в своём классе. В ней хорошо хранить всяческую статистику, метрики, потоки сообщений и т.д.
Для работы с Redis я буду использовать пакет «github.com/go-redis/redis».
Поскольку данная статья — быстрый старт, то обе системы развернём с помощью Docker, используя готовые образы с DockerHub. Я использую docker-compose в Windows 10 в режиме контейнеров в ВМ Linux (автоматически созданная программой Docker ВМ) вот с таким вот файлом docker-compose.yml:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "Generated:1:1,Solved:1:1,Unsolved:1:1"
KAFKA_DELETE_TOPIC_ENABLE: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
redis:
image: redis
ports:
- "6379:6379"
Сохраните этот файл, перейдите в каталог с ним и выполните:
docker-compose up -d
Должны скачаться и запуститься три контейнера: Kafka (очередь), Zookeeper (сервер конфигурации для Kafka) и (Redis).
Убедиться, что контейнеры работают можно с помощь команды:
docker-compose ps
Должно быть что-то вроде:
Name State Ports
--------------------------------------------------------------------------------------
docker-compose_kafka_1 Up 0.0.0.0:9092->9092/tcp
docker-compose_redis_1 Up 0.0.0.0:6379->6379/tcp
docker-compose_zookeeper_1 Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
Согласно yml-файлу, должны автоматически создаться три очереди, посмотреть их можно командой:
docker exec kafka-container_kafka_1 /opt/kafka_2.12-2.1.0/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
Должны быть очереди (topics — темы в терминах Kafka) Generated, Solved и Unsolved.
Генератор данных бесконечно записывает в очередь числа со случайной задержкой. Его код предельно прост. Убедиться в наличии сообщений в очереди Generated можно с помощью команды:
docker exec kafka-container_kafka_1 /opt/kafka_2.12-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Generated --from-beginning
Далее процессор — тут следует обратить внимание на распараллеливание обработки значений из очереди в следующем блоке кода:
var wg sync.WaitGroup
c := 0 //counter
for {
// создайм объект контекста с таймаутом в 15 секунд для чтения сообщений
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// читаем очередное сообщение из очереди
// поскольку вызов блокирующий - передаём контекст с таймаутом
m, err := r.ReadMessage(ctx)
if err != nil {
fmt.Println("3")
fmt.Println(err)
break
}
wg.Add(1)
// создайм объект контекста с таймаутом в 10 миллисекунд для каждой вычислительной горутины
goCtx, goCcancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer goCcancel()
// вызываем функцию обработки сообщения (факторизации)
go process(goCtx, c, &wg, m)
c++
}
// ожидаем завершения всех горутин
wg.Wait()
Поскольку чтение из очереди сообщений блокирует программу, то я создал объект context.Context с таймаутом в 15 секунд. Этот таймаут завершит работу программы, если очередь будет долго пустовать.
Также для каждой горутины, которая проводит факторизацию числа также задано максимальное время работы. Я хотел чтобы числа, которые удалось факторизовать, записывались в одну БД. А числа, которые факторизовать за отведённое время не удалось — в другую БД.
Для определения примерного времени использовался бенчмарк:
func BenchmarkFactorize(b *testing.B) {
ch := make(chan []int)
var factors []int
for i := 1; i < b.N; i++ {
num := 2345678901234
go factorize(num, ch)
factors = <-ch
b.Logf("\n%d раскладывется на %+v\n\n", num, factors)
}
}
Бенчмарки в Go являются разновидностями тестов и помещаются в файл с тестами. На основе этого замера было подобрано максимальное число для генератора случайных чисел. На моём компьютере часть чисел успевала разложиться на множители, а часть — нет.
Те числа, которые удавалось разложить, писались в БД №0, неразложенные числа — в БД №1.
Тут надо сказать, что в Redis нет схемы и таблиц в классическом понимании. По умолчанию СУБД содержит 16 баз данных, доступных программисту. Эти базы отличаются своими номерами — от 0 и до 15.
Ограничение времени для горутин в процессоре обеспечивалось использованием контекста и оператора select:
// собственно факторизация
go factorize(n, outChan)
var item data
select {
case factors = <-outChan:
{
fmt.Printf("\ngoroutine #%d, input: %d, factors: %+v\n", counter, n, factors)
item.Number = n
item.Factors = factors
err = storeSolved(item)
if err != nil {
fmt.Println("6")
log.Fatal(err)
}
}
case <-ctx.Done():
{
fmt.Printf("\ngoroutine #%d, input: %d, exited on context timeout\n", counter, n)
err = storeUnsolved(n)
if err != nil {
fmt.Println("7")
log.Fatal(err)
}
return nil
}
}
Это ещё один из типовых приёмов разработки на Go. Смысл его заключается в том, что оператор select перебирает каналы, и выполняет код, соответствующий первому активному каналу. В данном случае или горутина выдаст результат в свой канал, или закроется канал контекста с таймаутом. Вместо контекста можно использовать произвольный канал, который будет выполнять роль управляющего и обеспечивать принудительное завершение горутин.
Подпрограммы записи в БД выполняют команду выбора нужной БД (0 или 1) и записывают пары вида (число — множители) для разобранных чисел или же (число — число) для неразложенных чисел.
func storeSolved(item data) (err error) {
// переключаемся на БД 0
cmd := redis.NewStringCmd("select", 0)
err = client.Process(cmd)
b, err := json.Marshal(item.Factors)
err = client.Set(strconv.Itoa(item.Number), string(b), 0).Err()
return err
}
Последней частью будет веб сервер, который будет отображать список разложенных и неразложенных чисел в виде json. У него будет две конечных точки:
http.HandleFunc("/solved", solvedHandler)
http.HandleFunc("/unsolved", unsolvedHandler)
Обработчик http-запроса с получением данных из Redis и отдачей их в виде json выглядит так:
func solvedHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
// выбираем БД №0 - разложенные числа
cmd := redis.NewStringCmd("select", 0)
err := client.Process(cmd)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// получаем все ключи из БД
keys := client.Keys("*")
var solved []data
var item data
// для каждого ключа получаем значение и добавляем в массив
for _, key := range keys.Val() {
item.Key = key
val, err := client.Get(key).Result()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
item.Val = val
solved = append(solved, item)
}
// десериализуем массив в JSON
err = json.NewEncoder(w).Encode(solved)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
Результат запроса по адресу: localhost/solved
[{
"Key": "1604388558816",
"Val": "[1,2,3,227]"
},
{
"Key": "545232916387",
"Val": "[1,545232916387]"
},
{
"Key": "1786301239076",
"Val": "[1,2]"
},
{
"Key": "698495534061",
"Val": "[1,3,13,641,165331]"
}]
Вот теперь можно углубиться в документацию и специализированную литературу. Надеюсь, что статья была полезна.
Прошу специалистов не полениться и указать на мои ошибки.
Комментарии (14)
hazard-2
22.02.2019 12:20Ни разу не быстрый старт!
После прочтения настроение попортилось и появилось отторжение к «Go + Apache Kafka + Redis».
Больше похоже на доку к коду(код не смотрел).
grSereger
22.02.2019 13:08Подскажите, пожалуйста, пробовали ли вы запускать несколько консьюмеров для того, чтобы распараллелить обработку сообщений?
DmitriyTitov Автор
22.02.2019 13:41Внутри процессора (обработчика) и так запускается множество (неограниченно) горутин, каждая обрабатывает свои данные.
Можно, конечно, ещё и несколько обработчиков запустить, должно быть ОК.
А вообще, для того чтоб ещё сильнее рапараллелить, нужно ещё и несколько разделов в очереди создать. Тогда уже на уровне Kafka будет параллельное обслуживание клиентов. Ну и обработчики должны подключаться к разным разделам (partition).grSereger
22.02.2019 13:58множество (неограниченно) горутин
вы как минимум упираетесь в ограничение по количеству ядер.
чтоб ещё сильнее рапараллелить, нужно ещё и несколько разделов в очереди создать
Тогда вам и писать придётся в разные партиции, т.е. продюсер распределять записи по партициям и добавлять/удалять партиции в этом случае будет очень не удобно
Я собственно говорю про тот кейс, когда продюсер генерирует сильно больше событий, нежели может обработать 1 консьюмер, и необходимо при увеличении потока уметь достаточно просто добавлять новые консьюмерыDmitriyTitov Автор
22.02.2019 14:07вы как минимум упираетесь в ограничение по количеству ядер.
на своём компьютере с процессором Ryzen я легко запускаю 1 000 000 горутин. столько ядер у меня нет :)
необходимо при увеличении потока уметь достаточно просто добавлять новые консьюмеры
чтобы получать сообщения из одного топика, несколько получателей (consumer) должны принадлежать к одной consumer group. тогда сообщения будут отдаваться им как бы параллельно. конечно, предполагается, что обработка сообщения требует значительно больше ресурсов, чем сам процесс полученияgrSereger
22.02.2019 14:56я, наверно, не так выразился — я про то, что запустив 1млн горутин вы не сделаете свою программу производительнее (скорее наоборот).
Что касается consumer group — я собственно к этому и вел, что кейс работы с кафкой интересен при возможности распараллеливания обработки между разными инстансами приложения, а не внутри 1го. Но для этого нужна совсем другая реализация.DmitriyTitov Автор
22.02.2019 15:40ясно, но тут распараллеливание что называется «искаропки». как, впрочем, и в других очередях.
можно запустить много получателей, тут вообще ничего не надо дописывать — просто запускаем много и всё. логика работы не поменяется. можете развернуть у себя и запустить скриптом 100 штук — всё займёт минут пять.
DmitriyTitov Автор
22.02.2019 16:01Хотя может я чего не понял. Поясните про другую реализацию. В чём отличия будут?
lukashes
23.02.2019 15:05Привет, если у вас один инстанс консьюмера, то это будет единой точкой отказа. Распараллеливание может давать не только скорость, но и отказоустойчивость.
DorianPeregrim
22.02.2019 16:13на своём компьютере с процессором Ryzen я легко запускаю 1 000 000 горутин. столько ядер у меня нет :)
Запускаете. Но это не означает что все 1 000 000 горутин будут исполняться в одно и то же время.
DmitriyTitov Автор
22.02.2019 16:19это очевидно. вообще, на практике, производительность процессора вас будет редко интересовать, обычно проблема будет в (по мере значимости):
1. подсистема хранения
2. сеть
3. оперативная память (перемещение данных из RAM в кэш процессора)
кроме того, различные блокирующие операции (доступ к защищённым секциям, ввод/вывод) позволят переключать выполнение горутин и создадут эффект параллельного исполнения кода
к тому же, переключение контекста в случае горутин не требует перехода user space — kernel space и осуществляется весьма быстро. стэк горутин — пара килобайт — легко помещается в кэш процессораbat
25.02.2019 09:49на практике, производительность процессора вас будет редко интересовать
не надо говорить за всех, cpu bound задач в полне хватает
Вам пытаются донести, что стартовать по горутине на сообщение так себе решение. так нормальный pipeline не построить. Нормальной практика — ограниченный пул горутин.
Olehor
Годно, продолжайте в том же духе!