Поймал как-то программист золотую рыбку, а она ему и говорит человеческим голосом: «Отпусти меня в синее море, я исполню любое твоё желание». Программист ЦПУшечкой поскрипел, да и выдал техзадание. «Построй мне», – говорит, – «систему обработки неидемпотентных запросов, да чтоб она была высокопроизводительной, масштабируемой, гибкой и отказоустойчивой!». Охнула сперва золотая рыбка, но взяла себя в плавники и молвила: «Не печалься, ступай себе домой, код написан, система развёрнута. Отпускай меня уже». Удивился программист: «Да ладно? Ну, сейчас проверю и отпущу». «Нет», – возражает рыбка – «пока ты проверяешь, я уж засохну, и всё волшебство исчезнет». Программист задумался: «Что же делать: сначала отпустить, а потом проверить, или сначала проверить, а потом сушёную рыбу к пиву получить?».

Мечта многих разработчиков – высокопроизводительная, масштабируемая, гибкая и устойчивая система обработки запросов. Казалось бы, для решения этой задачи есть мегабыстрая Apache Kafka (далее просто Kafka) в качестве брокера сообщений, супергибкий Python для реализации получателя/обработчика сообщений, и ещё какая-нибудь шустрая NoSQL база данных. По отдельности они все прекрасны, как три девицы под окном, но можно ли собрать из них один конвейер для обработки данных и не потерять их важные преимущества?

Семантики чтения: потерять нельзя повторить

Один из основополагающих вопросов, который в первую очередь придётся решить при строительстве гибкой и безотказной системы – выбор семантики чтения сообщений. Работу Kafka и её взаимодействие с консюмерами (т.е. приложениями, которые читают из Kafka сообщения) можно проиллюстрировать следующим примером. Рассмотрим некий проект с менеджером, распределяющим задачи от заказчика между своими сотрудниками (рис. 1).

Рис. 1
Рис. 1

Сотрудник уведомляет менеджера о факте решения задачи, но если он не уложится в заявленный срок, то менеджер должен перекинуть задачу на другого сотрудника. Таким образом, гарантируется решение задачи даже в том случае, если какой-то из сотрудников выбывает из игры. Однако, если последний всё же вернется к решению той же задачи, мы получим двойную работу.

В случае семантики «at least once» («не менее одного раза»), сотрудник сообщает менеджеру, что задача решена только тогда, когда она действительно решена. Плюс в том, что потеря задачи исключена, но при этом мы обязаны смириться с тем, что есть вероятность повторного решения задачи.

В случае семантики «at most once» («не более одного раза»), сотрудник берёт на себя всю ответственность за решение задачи, уведомляя менеджера соответствующим образом сразу же после её получения. В таком случае исключается вероятность делегирования менеджером этой же задачи другому сотруднику, но если первый исполнитель потерялся, то и задача теряется вместе с ним.

Ещё одна семантика – «exactly once» («ровно один раз») – предполагает, что решение задачи и уведомления о факте решения неотделимы друг от друга. Идеальный вариант – когда уведомление и является решением. Как, например, в формате: вопрос-ответ. Здесь исключены и повторы, и потери.

Очевидное – не значит оптимальное…

Как нам подсказывает Капитан Очевидность, в данной аналогии менеджер соответствует брокеру сообщений (в нашем случае, Kafka), заказчик – продюсеру, а сотрудник – консюмеру.

Рис. 2
Рис. 2

Консюмеры, реализованные на каком-то из языков программирования, считывают сообщения из брокера и обрабатывают их, записывая информацию в базу данных (рис. 2). Здесь, на Хабре, немало статей про устройство Kafka, поэтому в данной статье ограничимся лишь общим описанием, и в дальнейшем будем рассматривать конкретно реализацию консюмера на Python.

Продюсеры отправляют сообщения в Kafka в указанные ими топики – такие группы по интересам. Топики состоят из партиций – по сути, файлов, куда записываются сообщения с соблюдением порядка (рис. 3). У каждого сообщения свой оффсет – смещение в партиции. Консюмеры считывают сообщения из партиций, подключаясь к ним при установке соединения с Kafka. При этом, если они объединены в группу (consumer group), то члены одной группы не могут читать из одной партиции.

Рис. 3
Рис. 3

Консюмер подтверждает обработку сообщения, выполняя коммит, т.е. сдвигая текущий оффсет в партиции до оффсета следующего сообщения (рис. 4). После этого он может считать новое сообщение.

Рис. 4
Рис. 4

Как видно из примера с менеджером, проблема выбора семантики теряет свою актуальность, если консюмер можно реализовать как подсистему Kafka: уместив в одну транзакцию всю обработку сообщения и её подтверждение. В этом случае Kafka Streams обеспечивает, в том числе семантику «exactly-once», исключая повторы и потери. Однако здесь проблема в том, что невозможно вместить невмещаемое, каким логика консюмера зачастую и является, да ещё и не потерять при этом в скорости обработки. И тут уже остается выбирать между оставшимися двумя семантиками. Мы сейчас детально рассмотрим, что при этом нужно учесть, имея под рукой изящный, и богатый на готовые решения Python.

На первый взгляд семантика «at least once» выглядит предпочтительней: лучше обработать сообщение дважды, чем потерять его совсем. Но что, если нам нужно обрабатывать неидемпотентные запросы? К примеру, запрос на списание денег со счёта клиента для оплаты каких-то услуг. И волею Kafka это списание внезапно начинает повторяться и повторяться.

Поэтому, прежде чем делать выводы о применимости той или иной семантики, стоит рассмотреть саму ситуацию, приводящую к потерям сообщений, либо повторной обработке.

Рис. 5
Рис. 5

Как видно на рис. 5, сообщения 2 и 3 успели обработаться. Один из консюмеров подтвердил их обработку, а другой не успел, поскольку у него что-то пошло не так, и он аварийно перезапустился (рис. 6).

Рис. 6
Рис. 6

Когда какой-либо из обработчиков перестаёт подавать признаки жизни (не отправляет своевременно хартбиты в Kafka), Kafka выполняет ребаланс – перераспределение партиций между оставшимися в живых обработчиками. Это делается для того, чтобы не оставалось нечитаемых партиций, ведь поток сообщений от продюсеров не останавливается, и их нужно кому-то обрабатывать. Так осиротевшая партиция отдаётся единственному живому консюмеру (рис. 7).

Рис. 7
Рис. 7

В следующий момент времени второй консюмер перезагружается, переподключается к Kafka и запрашивает у неё новую задачу.  Для оптимального распределения нагрузки на обработчиков при подключении консюмера Kafka снова выполняет ребаланс.

Рис. 8
Рис. 8

При этом Kafka всё равно, какому консюмеру какую партицию отдавать, и это вполне стандартная ситуация, когда перераспределение партиций выполнилось как на рис. 8.

К чему это приводит? Консюмер, обработавший документ 5, пытается закоммитить новый оффсет, но происходит исключение, потому что партиция, к которой этот коммит относится, уже отдана другому консюмеру (рис. 9).

Рис. 9
Рис. 9

Чем больше консюмеров – тем выше вероятность таких некорректных коммитов и последующих исключений.

Мы должны обработать это исключение, чтоб избежать отключения консюмера. Теперь консюмер сможет прочитать новое сообщение из актуальной партиции, но велика вероятность, что оно уже было обработано ранее другими консюмерами. Просто они не успели закоммитить следующий оффсет до ребаланса. Поэтому мы получим повторную обработку сообщений, что для нас критично, если мы имеем дело с неидемпотентными запросами (рис. 10).

Рис. 10
Рис. 10

Помимо повторной обработки мы сталкиваемся ещё с двумя проблемами. Во-первых, это отсутствие гибкости: пока консюмер обрабатывает одно сообщение, он не может параллельно или асинхронно заняться другими (по крайней мере, из той же партиции), пока не подтвердит обработку сообщения, закоммитив новый оффсет. Во-вторых, мы перегружаем нашего менеджера – брокера сообщений, не имея возможности оперативно его разгрузить за счёт какого-либо дополнительного буфера, например, локального буфера обработчика. Ситуация становится особенно критичной в случае переполнения брокера.

Мы в ответе за тех, кого… прочитали

Посмотрим, что нам даст альтернативное решение – семантика «at most once». Теперь, как только консюмер считал сообщение из Kafka, он должен снять с брокера ответственность за это сообщение, сразу же выполнив коммит. Такой подход позволяет нам распараллелить обработку сообщения внутри консюмера, т.е. он может забирать из Kafka следующее сообщение, не дожидаясь окончания обработки предыдущих.

Сообщения поступают в топик, распределяясь по партициям. Главный процесс консюмера считывает сообщения из Kafka в локальный буфер, в роли которого в нашем случае выступает мультипроцессинговый пул Python с асинхронным выполнением задач. После чего сразу же выполняется коммит (рис. 11).

Рис. 11
Рис. 11

Из локального буфера сообщение уже вытягивается и выполняется кем-то из дочерних процессов консюмера. Таким образом, сообщения из Kafka быстро перетекают в локальные буферы и процессы консюмеров, снимая нагрузку с брокера (рис. 12).

Рис. 12
Рис. 12

Конечно, при таком подходе без дополнительных доработок консюмер может сразу считать всё содержимое партиции, на которую он подписан, а далее потерять свою производительность, зависнуть и упасть. Поэтому следует сразу предусмотреть ограничение на заполненность мультипроцессингового пула по размеру его кэша. Чтоб не нарушать инкапсуляцию, мы добавим новый класс пула с возможностью доступа к кэшу, унаследовав это класс от стандартного multiprocessing.pool.Pool.

import multiprocessing.pool as mp_pool


class LimitedMultiprocessingPool(mp_pool.Pool):
    def get_pool_cache_size(self):
        return len(self._cache)

Поместив в один класс всё необходимое для чтения сообщений из Kafka и передачи их в пул процессов для дальнейшей обработки, мы получим следующий прототип консюмера.

class MsgConsumer:
    def __init__(self, proc_fun, cfg):
        # Функция для обработки сообщения в дочернем процессе
        self.proc_fun = proc_fun
        # Клиент для чтения сообщений из Kafka
        self.consumer = kafka.KafkaConsumer(
            cfg.kafka_topic,
            auto_offset_reset="earliest",
            enable_auto_commit=False,
            bootstrap_servers=cfg.servers,
            group_id=cfg.group_id,
            client_id=cfg.client_id,
            check_crcs=cfg.check_crcs,
            consumer_timeout_ms=cfg.consumer_timeout_ms,
            session_timeout_ms=cfg.session_timeout_ms,
            request_timeout_ms=cfg.request_timeout_ms,
            max_partition_fetch_bytes=cfg.max_partition_fetch_bytes
        )
        # Лимит на количество сообщений, единовременно находящихся в пуле
        self.pool_cache_limit = cfg.pool_cache_limit
        # Флаг управляемой остановки приложения
        self.stop_processing = False
        # Пул обработчиков сообщений
        self.pool = LimitedMultiprocessingPool(processes=cfg.pool_size)
        # Обеспечиваем возможность остановки приложения по SIGTERM
        signal.signal(signal.SIGTERM, self.set_stop_processing)

    def set_stop_processing(self, *args, **kwargs):
        self.stop_processing = True

    def handle_pool_cache_excess(self):
        while self.pool.get_pool_cache_size() >= self.pool_cache_limit:
            # Здесь можно предусмотреть sleep
            pass

    def main_loop(self):
        while not self.stop_processing:
            for msg in self.consumer:
                if self.stop_processing:
                    break
                try:
                    self.handle_pool_cache_excess()
                    self.consumer.commit()
                except kafka_errors.CommitFailedError:
                    # Отлавливаем редкий, но возможный случай исключения 
                    # при ребалансе
                    continue
                self.pool.apply_async(self.proc_fun, (msg,))

Сайд-эффекты, и как с ними бороться

Теперь о том, как разобраться с неотъемлемым спутником решений, основанных на семантике «at most once», – потерями сообщений при отключении консюмера, в котором эти сообщения обрабатываются. Чтоб избежать потерь в случае управляемого отключения (рис. 13), мы добавляем механизм штатного останова: получив сигнал на выключение, приложение перестаёт тянуть запросы из Kafka и завершается, только когда закончит обработку всех сообщений, находящихся в пуле.

Рис. 13
Рис. 13

Для реализации механизма штатного останова пополним наш класс MsgConsumer соответствующим методом (не забыв импортировать модуль multiprocessing), который должен вызываться в случае выхода из метода run.

    def graceful_shutdown(self):
        try:
            self.consumer.close()  # Останавливаем клиента Kafka
            self.pool.close()  # Предотвращаем добавление новых задач в пул
            graceful_shutdown_end = time.time() + self.graceful_shutdown_timeout
            while graceful_shutdown_end > time.time():
                active_child_proc_num = len(mp.active_children())
                if active_child_proc_num == 0:
                    break
                # Здесь можно предусмотреть sleep
            else:
                raise
        except Exception as ex:
            self.pool.terminate()
            raise ex
        finally:
            self.pool.join()

Радоваться, конечно, рано – может произойти аварийное отключение (рис. 14). Рассмотрим эту ситуацию детально.

Рис. 14
Рис. 14

Отключение консюмера, как мы помним, неизбежно приводит к ребалансу. Через какое-то время отключившийся консюмер перезагружается, переподключается к Kafka, запрашивает у неё новое сообщение, и снова возникает ребаланс. Вероятность некорректного коммита, который мы имели счастье наблюдать ранее, очень мала. Дело в том, что коммит делается практически сразу после считывания сообщения, и, чтобы произошло исключение, ребаланс должен произойти как раз в доли секунды между чтением и коммитом. Если исключение всё же случается (рис. 15), мы его обрабатываем и тянем из актуальной партиции новое сообщение.

Рис. 15
Рис. 15

При этом исключается возможность того, что оно уже было кем-то обработано ранее, т.к. при выбранной нами семантике повторы, в принципе, исключены. К этому, собственно, мы и стремились.

Вернёмся к проблеме потери сообщений, находившихся в мультипроцессинговом пуле в момент аварийного отключения консюмера. На этот случай у нас должна быть предусмотрена обратная связь по каждому сообщению в заявленный срок (рис. 16). Сообщения, по которым до таймаута отсутствует отклик, должны быть переотправлены продюсером повторно.

Рис. 16
Рис. 16

Выбор зависит от условий

Итак, давайте подведём итог. Решение на основе семантики «at most once» с использованием мультипроцессингового пула в условиях неидемпотентности запросов и возможности организации обратной связи между консюмером и продюсером даёт следующие возможности:

  • Во-первых, позволяет консюмерам обрабатывать сразу несколько сообщений параллельно или асинхронно;

  • Во-вторых, позволяет динамически подключать и отключать консюмеров с минимальной вероятностью исключения из-за ребаланса;

  • В-третьих, не требует затрат на предотвращение повторной обработки сообщений в случае ребаланса, т.к. повторы исключены;

  • В-четвёртых, позволяет быстрее разгрузить Kafka за счёт ресурсов подключаемого консюмера при масштабировании. В частности, благодаря тому же мультипроцессинговому пулу;

  • В-пятых, учитывая, что продюсер (он же заказчик, он же, к примеру, фронтенд) и так должен предусматривать возможность переотправки сообщений (например, если в сети произошёл сбой, и сообщение вообще не дошло до бэкенда), в консюмере оказывается достаточно минимальной логики предотвращения потерь на основе механизмов обработки исключений и штатного останова, чтобы вся система работала надёжно. Естественно, как уже было сказано, консюмер должен уведомлять продюсера об успешной обработке очередного сообщения.

Неожиданно данный подход – «at most once» решая сразу три проблемы, добавляет ещё и пару преимуществ. Конечно же, приведённое решение ни в коем случае не претендует на роль оптимальной универсальной стратегии на все случаи обработки данных. Например, если условия таковы, что предотвращать потери запросов необходимо исключительно средствами бэкенда – лучше избрать стратегию, основанную на семантике «at least once». Поэтому в общем случае, выбор стратегии далеко неоднозначен. Но там, где семантика «at most once» применима, она позволит в наиболее полной мере использовать важнейшее преимущество Kafka перед другими брокерами – скорость. В купе с другими преимуществами это позволит вам построить систему высокопроизводительной, масштабируемой, гибкой и отказоустойчивой… Прямо, как в сказке.

 …Решил программист всё-таки сперва золотую рыбку отпустить: жалко же, если такой уникальный экземпляр на закуску пойдёт. А не исполнит она желание – ничего, в следующий раз попадётся. Махнула рыбка хвостом и уплыла восвояси. Пошёл программист домой, проверил... Чудеса! Всё написано, настроено и работает, как надо. Ай да рыбка, не обманула! Порадовался программист, но недолго. Работать надо: хоть система и сказочная, а мэйнтенанса требует...

Made by: ведущий разработчик компании STM Labs Илья Орлов

Комментарии (14)


  1. RPG18
    06.11.2021 23:13
    +1

    А как обрабатывается недоступность БД?


    1. avtozavodetz Автор
      07.11.2021 00:27

      В случае недоступности БД обработка запроса, предусматривающего обращение к БД, завершится ошибкой, о чём будет уведомлён инициатор запроса. Отправку запроса, конечно, через какое-то время придётся повторить.


      1. RPG18
        07.11.2021 12:29

        А как долго продюсер ждет подтверждения и куда он их сохраняет, а обратная связь сделана через outbox?


        1. avtozavodetz Автор
          07.11.2021 22:06

          В нашем случае обратная связь реализована через Redis, куда продюсер записывает идентификатор и статус запроса. Консюмер должен обновить этот статус по завершении обработки. Если в течение часа (таймаут может отличаться для разных типов запросов) статус не поменяется, продюсер (или клиент, т.к. большинство запросов отслеживаются именно клиентами их отправившими) посчитает, что запрос завершился ошибкой, и клиент получит соответствующее уведомление. Поскольку Redis не связан напрямую с основной БД, говорить о том, что у нас поддерживается outbox, конечно, нельзя. Однако, замечу, что запись результатов обработки запроса в основную БД сопровождается записью в т.н. журнал запросов в этой же БД, что является дополнительным средством контроля повторной обработки запроса.


          1. RPG18
            08.11.2021 14:41

            Просто запись в БД и запись в Redis не одна атомарная операция. Мы записали в БД, пишем в Redis и ловим ошибку от Redis.


            1. avtozavodetz Автор
              08.11.2021 21:20

              Согласен! Поэтому у нас и предусмотрен журнал запросов в основной БД. Клиент, не получив подтверждения об успешной обработке запроса по истечении таймаута, сочтёт такой запрос обработанным с ошибкой и может его повторить. Но поскольку данные этого запроса уже есть в журнале, консюмер не станет обрабатывать его повторно, а просто попытается ещё раз актуализировать информацию в Redis.


              1. RPG18
                08.11.2021 22:43

                ааа, понятно своеобразный inbox pattern.


    1. xpress
      07.11.2021 11:56

      Для таких случаев можно использовать DLQ (Dead Letter Queue). Физически это такой же топик в Kafka, куда направляются необработанные сообщения.


      1. RPG18
        07.11.2021 12:24

        upd. перечитал статью и увидел обратную связь


      1. avtozavodetz Автор
        07.11.2021 22:08

        Да, как вариант) Но мы ограничились уведомлением об ошибке, фактически перенеся ответственность за дальнейшую судьбу необработанного сообщения туда, где это сообщение лучше всего сохранится до лучших времен (до последующей переотправки), а именно - на клиентскую часть.


  1. Mel
    07.11.2021 14:51
    +1

    Спасибо за статью, как раз хотелось пощупать кафку. А почему использовался мультипроцессинг пул, вместо тред пула?


    1. avtozavodetz Автор
      07.11.2021 22:10

      У нас больший процент времени обработки запроса составляют именно вычисления, нежели обращения к БД. Поэтому и применили мы для оптимизации именно средства обеспечения параллелизма, а не конкурентности. И остановились на старом добром multiprocessing.Pool.


  1. shabelski89
    07.11.2021 22:04

    Вот это было бы интересно подробнее, как реализовано в коде.

    На этот случай у нас должна быть предусмотрена обратная связь по каждому сообщению в заявленный срок


    1. avtozavodetz Автор
      07.11.2021 22:14

      О, это очень просто:

      redis_client.set(key, value)

      где key - это id запроса, а value включает код и статус результата обработки:) Статус запроса в Redis отслеживается либо продюсером, либо клиентом (через отдельные запросы к тому же продюсеру).