Всем привет, меня зовут Александр Бобряков. Я техлид в команде МТС Аналитики, занимаюсь Real-Time обработкой данных. Недавно мы начали использовать фреймворк Apache Flink. Эту технологию выбрали, так как она (в отличие от Apache Spark) относится к true-стримингу и позволяет строить обработку данных с минимальной задержкой. В качестве DI-фреймворка выбрали привычный нам Spring Framework.

Наша команда использует разные виды тестирования: Unit, интеграционное, E2E, ручное, автоматизационное, нагрузочное. Этой теме будет уделено особое внимание. Я расскажу, какие подходы и паттерны помогут быть уверенными в качестве того, что вы разработали.

Эти знания будут полезны любому Java-разработчику, знакомому со Spring. А начинающие Flink-специалисты найдут здесь примеры стандартных ошибок и пути их решения. Обратите внимание, что эти материалы писались, когда актуальной версией Flink была 1.17. В конце октября вышла версия 1.18, и она может отличаться некоторыми нюансами.

О чем будет цикл статей

Этот рассказ будет состоять из нескольких постов. В них мы будем разрабатывать и тестировать небольшие бизнес‑задачи. Рассмотрим пайплайн Kafka‑to‑Kafka, задачу дедупликации событий, использование таймеров и другие примеры. По мере выхода новых материалов прямо здесь будут появляться ссылки, чтобы вам было легче ориентироваться в постах.

Весь код содержится на моём GitHub. Чтобы было проще, для каждой статьи я сделал отдельную ветку. Посты будут связаны отсылками на абстракции из предыдущих частей. Но вообще мы рассмотрим в этих текстах отдельные темы, знания из которых можно применять в разных проектах независимо друг от друга.

Для начала вспомним (или разберёмся), как работает потоковая обработка данных. Если вы с этим уже знакомы, раздел ниже можно пропустить. Если хотите подтянуть знания или разобраться с нуля, смело заглядывайте под спойлер:

Введение в потоковую обработку данных

Подходы к обработке данных

Говоря о конвейере обработки данных, обычно выделяют два подхода: пакетную обработку и потоковую.

При пакетной обработке перед вычислением операций мы принимаем весь, обычно небольшой, набор данных. Это означает, что на входе можно вычислить статистику распределения данных в пакете, произвести агрегации или сортировку и так далее.

Сами пакеты данных можно получать как по количеству (например, 200 событий в пакете), так и по времени (например, раз в 20 минут). Такие системы проектируются, чтобы выполнять задачи в запланированное время. Поэтому их главный недостаток — высокая задержка (latency).

Потоковая обработка
Потоковая обработка

В потоковой обработке задержка минимальна., Это обусловлено влиянием, например, сетевых буферов. О такой системе можно говорить как о способной к Real-Time-обработке. Поток данных может никогда не закончиться (как кликстрим пользователей с сайтов), а приложение — проработать годами без остановки.

Графы потоков данных

Любую программу потоковой обработки нужно визуализировать, чтобы понимать, как перемещаются и изменяются данные. Для этого используют графы: логический и физический. Основными компонентами являются:

  • Source  — источник данных (файлы, сокеты, брокеры сообщений, БД…)

  • Operator — единица, выполняющая непосредственное вычисление (фильтрация, преобразование...)

  • Sink  — приемник выходных данных (файлы, сокеты, брокеры сообщений, БД…)

Все эти компоненты могут множество раз встречаться в одном графе.

Логический граф потока
Логический граф потока

Логический граф представляет собой «вид сверху». Он показывает, как реализован конвейер обработки «от и до», какие выполняются преобразования над данными.

Физический граф потока
Физический граф потока

Чтобы понять, как будет выполняться программа, надо составить физический граф. В распределенной системе каждый оператор может быть представлен несколькими задачами (Task) по заданному значению параллельности. Каждая из них является отдельным узлом графа. Соответственно, таски могут выполняться на разных физических машинах распределенного кластера, каждая из которых обрабатывает свою порцию данных.

Типы потоковой обработки

В мире потоковой обработки выделяют:

  1. Stateless  — обработка без сохранения состояния

  2. Stateful  — обработка с сохранением состояния

Stateless рассматривает каждое поступающее событие отдельно от остальных. Никакая промежуточная информация, которая могла бы зависеть от предыдущих событий не учитывается. Из преимуществ можно выделить простоту распараллеливания потока, а также быстрый перезапуск пайплайна обработки.

Но что делать, если нам нужно учитывать промежуточную статистику? Например, при дедупликации поступающих событий важно знать, встречали ли мы их двойников раньше. В этом случае нужно сохранять информацию между обработкой сообщений. Поэтому такой пайплайн является stateful.

В этом примере Operator_1 не имеет состояния. Он представляет собой обычный stateless-оператор. А вот Operator_2 имеет внутреннее локальное состояние. В него можно записать информацию о связке ключ-true для каждого уникального сообщения. Тогда в случае дедупликации мы поймем, что сообщение с таким ключом уже обрабатывалось.

Основная проблема — поддерживать доступность состояний, а также сохранять их данные, чтобы при сбое можно было восстановить приложение без потерь. Для этого есть механизмы периодического снятия снапшотов состояний. О них мы поговорим дальше.

Архитектура Apache Flink

Apache Flink — это платформа распределенной обработки данных с возможностью отслеживания состояния в потоке. Под капотом используется фреймворк Akka для межкомпонентной координации (начиная с версии 1.18 заменен на Apache Pekko).

В отличие от Spark с его пакетной и микробатчинговой обработками, Flink поддерживает потоковую обработку нативно. Он разрабатывался полностью под эту идеологию, хотя и батчинг тоже есть. Это позволяет Flink иметь достаточно высокую пропускную способность с низкой задержкой.

Платформа интегрируется со всеми основными распределенными менеджерами ресурсов. Но она может работать и в качестве автономного кластера. Также важно понимать, что Flink не обеспечивает надежного распределенного хранилища, это не его задача.

Рассмотрим чуть подробнее основные компоненты Flink.

На первом этапе клиент отправляет программу потоковой обработки данных (например, свой jar‑файл) диспетчеру заданий из кластера Flink. Это можно сделать через Flink UI, либо воспользовавшись командной строкой. Дальше в дело вступают следующие компоненты:

  • JobManager координирует весь кластер: принимает решение о запуске задач, действиях при сбое других компонентов, создает контрольные точки и так далее. Всегда существует как минимум один JobManager. В случае проблемы он выступит единой точкой отказа, но эта проблема решается через механизмы высокой доступности от Kubernetes или Apache Zookeeper.

  • TaskManager занимаются обработкой потока, буферизируют данные. Минимальной единицей планирования ресурсов тут является TaskSlot. Он представляет собой фиксированное подмножество ресурсов, которые не будут конкурировать друг с другом. В каждом таком слоте могут выполняться разные задания от распараллеленных операторов. Очень важно, что каждый TaskManager — это одна отдельная JVM.

Часть обязанностей Flink делегирует на внешние компоненты. Например, для управления TaskManager можно использовать менеджеры ресурсов с реализацией в виде Hadoop Yarn, Kubernetes, Apache Mesos.

Выполнение задачи в Flink

Давайте рассмотрим пример:

Слева изображен граф, в котором буквы определяют операторы. A и C — источники данных (source), E — приемник данных (sink), D и B — стандартные операторы, а цифры — это параллельность оператора. Её мы можем задавать программно.

В этом случае четыре задачи (Task) источника A распределяются на четыре слота в TaskManager 1 и TaskManager 2. Остальные операторы распределят свои задачи по слотам аналогичным образом согласно их уровню параллелизма.

При этом сами данные должны будут передаваться между операторами. На рисунке видно, что они могут перемещаться как внутри одного слота (с минимальной задержкой), так и между TaskManager в случае связи оператора C — D (это приведет к дополнительным затратам на сериализацию данных).

Каждый слот — это фиксированное подмножество ресурсов своего TaskManager.

TaskManager с двумя слотами будет выделять каждому из них 1/2 своей управляемой памяти. Благодаря этому конкуренции создаваться не будет. Но здесь не происходит изоляция ЦП; сейчас слоты разделяют только управляемую память задач.

Регулируя количество слотов, можно определить, как подзадачи будут изолированы друг от друга. Наличие одного слота на TaskManager означает, что каждая группа задач выполняется в отдельной JVM. Это уменьшит их взаимное влияние.

Управление состоянием

Как упоминалось ранее, Flink позволяет использовать состояние для операторов под написание Stateful-потоков обработки. При получении каждого события мы можем обратиться к состоянию и получить из него соответствующее значение.  Затем, при желании, обновить его или очистить.

Из коробки предоставляется несколько бэкенд-состояний:

  • Java Heap — всё состояние хранится в JVM Heap памяти TaskManager. Это позволяет иметь минимальную задержку взаимодействия. Из проблем — мы ограничены размером хипа, поэтому этот тип используется только для локального тестирования.

  • RocksDB — хранит данные в RocksDB, которая по умолчанию хранится в локальных каталогах данных TaskManager. Состояние сначала записывается в собственную память, а при достижении настроенного порога сбрасывается на локальные диски.

    В отличие от Java Heap, данные хранятся в виде сериализованных массивов байт. Их лучше использовать при очень больших размерах состояний. Причина этого в том, что единственное ограничение — объем дискового пространства кластера. С другой стороны, существуют затраты на постоянную де-/сериализацию.

  • External State — вы можете написать свою интеграцию с внешним хранилищем, куда будут сохраняться произвольные объекты. Часто встречаются интеграции с PG или Aerospike. В этом случае нужно позаботиться о поддержке механизма высокой доступности и чекпоинтах. Про них поговорим дальше.

Высокая доступность Flink

При правильной настройке Flink гарантирует, что если случится сбой,  со стороны вы заметите только небольшие задержки. В процессе работы распределенного Flink-кластера могут возникать два вида существенных сбоев:

1. Падение TaskManager

В этом случае JobManager просит ResourceManager предоставить больше слотов обработки для перезапуска всех задач, выполнявшихся на упавшем TaskManager. Если это невозможно, JobManager не сможет перезапустить приложение, пока не появится достаточное количество слотов.

2. Падение JobManager

По умолчанию на каждый Flink-кластер приходится по экземпляру JobManager. Это создает единую точку отказа (SPOF). В случае падения новые программы не запустятся, а все работающие завершатся сбоем.

Общая идея высокой доступности JobManager заключается в том, что в любое время существует один ведущий JobManager Leader и несколько резервных JobManager StandBy. В случае отказа лидера любой из них берет руководство на себя (как на рисунке выше).

Чтобы восстановить отправленные задания, Flink сохраняет их метаданные и артефакты (JAR, Graph…). Они не удаляются, пока их задание не завершится или не будет отменено. Как только это произойдет, все метаданные сотрутся.

Контрольные точки (checkpoints)

Checkpoint — это согласованная копия состояния каждой задачи потокового приложения в момент, когда все задачи отработали один и тот же вход (события в потоке). Чекпоинты используются для перезапуска приложения в случае сбоя. Если мы сознательно останавливаем его выполнение, по умолчанию все checkpoint удаляются.

Процесс создания контрольной точки можно описать так: JobManager в определенные моменты (конфигурируется программно) делает чекпоинты, отправляя в поток метку «checkpoint barrier».

При получении барьера контрольной точки все задачи источников записывают свои смещения и вставляют барьер в свой выходной поток.

Барьер проходит через весь граф заданий, инициализируя создание слепков состояний каждого stateful-оператора. Эти слепки сохраняются во внешнее устойчивое хранилище, а оператор отправляет JobManager дескриптор с информацией своего слепка.

JobManager поймёт, что все слепки готовы и checkpoint можно считать удачно созданным, когда барьер пройдет через весь граф обработки и каждый оператор сохранит свой слепок состояния, отправив дескриптор об этом в JobManager.

Бэкенды состояний (например, RocksDB) позволяют создавать снапшоты асинхронно, не останавливая обработку последующих за барьером событий.

По умолчанию контрольные точки хранятся в памяти JobManager. Для надлежащего сохранения большого состояния Flink поддерживает разные подходы к хранению чекпоинтов в других местах (HDFS, S3).

Восстановление с помощью механизма контрольных точек очень простое. При сбое Flink выбирает последнюю завершенную контрольную точку и еще раз разворачивает весь распределенный поток данных. Каждому оператору предоставляется состояние, которое было снято как часть последней контрольной точки по соответствующему дескриптору.

Но что, если не происходит падений, а мы изменили бизнес-логику в нашем потоковом приложении? Хотелось бы сделать его редеплой, восстановив состояние всех операторов. В этом случае чекпоинты нам не помогут (так как по умолчанию они удаляются после направленной остановки приложения). Нам поможет похожий механизм, называющийся точками сохранения (savepoints).

Точки сохранения (savepoint)

Savepoint — ещё один механизм создания моментального снимка работающего задания. Он создаётся с помощью контрольных точек Flink, но содержит дополнительные метаданные. Точки сохранения не создаются и не удаляются автоматически.

Их используются для:

  1. Перезапуска приложения — чтобы обновить или масштабировать.

  2. Перезапуска приложения на другом Flink‑кластере.

  3. Приостановки и дальнейшего запуска приложения — чтобы временно освободить ресурсы кластера.

Процесс использования при перезапуске приложения выглядит так:

  • Останавливаем приложение с созданием точки сохранения командой:

$ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
  • Запускаем приложение из точки сохранения:

$ bin/flink run -s :savepointPath [:runArgs]

Как правило, эволюция приложения вызвана новыми бизнес-требованиями или исправлением ошибок. Но что, если мы слишком сильно изменим наше приложение? В некоторых случаях Flink не сможет корректно сопоставить состояния при их восстановлении. На этот случай поделюсь парой советов.

Совместимость состояния обновленного Flink-приложения

Исходное приложение и его новая версия должны быть совместимы с точкой сохранения. Поэтому развивать приложение нужно соблюдая правила:

Если перезапустить приложение из точки сохранения, JobManager начнет сопоставлять с операторами их предыдущие состояния. Для этого он использует идентификатор каждого из операторов

Но это значение по умолчанию рассчитывается путем анализа всего графа. Если мы изменили граф (например, добавили новый оператор), то идентификатор всех последующих операторов в графе тоже изменится.

Если JobManager не находит оператора, для которого в точке сохранения записано состояние, система выдаст ошибку и приложение не запуститься. Пример такой ошибки можно увидеть в следующих логах:

В следующих статьях мы вернемся к описанному выше. Там я подробнее покажу подходы к написанию Java-кода в связке со Spring для реализации Flink-заданий.

В следующей статье подробно пройдемся по базовым шагам построения шаблона Flink-приложения со Spring и попробуем запустить его в Flink-кластере, развернутом в Docker. Спасибо, что прочитали!

Комментарии (5)


  1. navrotski
    10.11.2023 09:44

    Можно (и зачастую нужно) идентификаторы операторам назначать в коде. Они не изменятся при добавлении новых.


    1. appp_master Автор
      10.11.2023 09:44

      Да, именно так. Для stateful-заданий пользовательское указание идентификаторов для каждого оператора является скорее обязательным условием, так как рано или поздно столкнемся с эволюцией приложения.

      Более того такое назначение идентификаторов является одним из пунктов чеклиста "Production Readiness Checklist".

      В нашем проекте мы построили декораторы, заставляющие разработчиков указывать id при каждой трансформации потока. Если есть интерес, то могу рассказать отдельной статьей


  1. Anarchist
    10.11.2023 09:44
    +1

    Я сталкивался с задачами стриминга подобного рода, и нам показалось, что несколько секунд разницы между Spark и Flink не стоят того, чтобы добавлять новый стек. А у вас есть какие-то бенчмарки или примеры о заметном снижении эффективности Spark для таких задач?


    1. appp_master Автор
      10.11.2023 09:44

      К сожалению бенчмарки наша команда не делала, но мы полагались на опыт соседних команд, которые проводили сравнение под базовые задачи дедупликации на трафике в ~5-10млн rps. Flink показывал при этом лучшие результаты на одних и тех же ресурсах по latency. При этом нужно учитывать, что в наших задачах важен именно latency. Если сообщение "отдадим" слишком поздно, то оно уже никому не будет нужно.

      несколько секунд разницы между Spark и Flink не стоят того, чтобы добавлять новый стек

      поддерживаю, если у вас нет подобных требований. Spark может на подобных задачах выдавать latency более секунды, а Flink отработает шустрее благодаря внутренней архитектуре, ориентированной на true-стримминг.

      В сравнении с Flink я бы добавил необходимость для Spark очень тонкой настройки, для которой нужны опытные люди. В Flink эта настройка минимальна, а следовательно и порог вхождения ниже. С другой стороны в Spark появился экспериментальный режим Continuous Processing, который обещает минимальную задержку под at-least-once, но у нас нет опыта его использования (если вы использовали - интересно узнать реальные цифры/проблемы/кейсы).

      В дополнение могу порекомендовать доклад на SmartData 2023 "Spark Streaming: брать или не брать?", который как раз затрагивает задачи использования Spark, а когда все же лучше глянуть на другие инструменты.


    1. Fezzo
      10.11.2023 09:44

      Как минимум в задачах не требующих reduce-части (джойнов, агрегаций и т.д.) будет концептуальная разница. Например, если есть задача читать очень много данных из топика в кафке и писать их, скажем, в hdfs. В этом случае Spark будет запускать "микробатчи" на N тасков (в пределе по количеству партиций в топике) и тут начинаются проблемы. Из-за того, что микробатч - единая сущность, таски не смогут приступить к обработке следующих порций пока ВЕСЬ микробатч не завершится. А это значит, что завершенные таски будут просто стоять и ждать пока не завершатся все остальные. Что на больших объемах будет создавать совершенно лишние тормоза. И это если еще не брать в расчет выполнение в конкурентной среде или с динамическим выделением ресурсов - там ситуация будет еще хуже, т.к. разброс во времени выполнения каждого таска будет увеличиваться. Флинк же обеспечивает "нативный" стриминг, где таски друг друга не ждут