Привет! В этой статье мы рассмотрим важнейший аспект практически любого потокового приложения – работу со стейтом. Сегодня в роли подопытного выступит фреймворк Apache Flink.

Мы узнаем, как и где можно хранить стейт, какие структуры данных для этого используются, оценим скорость работы каждого подхода и узнаем, что из предложенных вариантов быстрее, а что – надежнее, и можно ли найти компромисс среди множества разноплановых комбинаций.

Небольшой дисклеймер

Подавляющая часть информации, представленной в этой статье, справедлива для всех релизов Apache Flink, начиная с версии 1.8. В версии 1.13 (последняя на момент выхода этой статьи) произошли небольшие правки API, которые в некоторой мере изменили видимую пользователю «оболочку» хранения стейта, но общие принципы остались прежними. Подробнее об этом можно прочитать здесь.

Если вы только начинаете знакомство с Apache Flink, то рекомендую посмотреть наш YouTube-митап по основам этого замечательного фреймворка.

Что такое стейт?

Стейт – важнейший и самый долгоживущий компонент любого приложения на Apache Flink. В идеальном случае он устойчив, нерушим и, фактически, вечен. Код приложения может бесконечно эволюционировать, обрастая новым функционалом, конфигурации кластера – расширяться и сжиматься в погоне за требованиями бизнеса, но все это никак не влияет на стейт. Гибкость работы с ним в плане эволюции схем и кода настолько высока, что стейт, накопленный в ходе многолетней работы приложения на старой версии Flink, может быть мигрирован на новую без каких-либо правок или дополнительных трудозатрат. Этот подход лежит в основе принципов развития Apache Flink, однако несколько усложняет разработку новых компонентов, например, модулей де/сериализации.

Существует несколько способов хранения стейта, так называемых state backend’ов. Каждый из них в равной мере имеет свои достоинства и недостатки. Важно знать, что конфигурирование state backend никак не затрагивает код приложения и выполняется независимо от его бизнес-логики.

Откуда берется стейт? Можно смело сказать, что стейт – это некоторая проекция агрегатов в контексте обработки бесконечного потока данных. Хорошими примерами его использования могут служить оконные операторы или, так называемые, скользящие агрегации в потоке событий. 

Какие бизнес-сценарии требуют использования стейта? Их несколько. Например, если анализируется поток данных на предмет повторяющихся паттернов в некотором временном отрезке (так называемый CEP – Complex Event Processing), или выполняется агрегация событий в течение минут/часов/дней, или если мы обучаем ML-модель на итерационном потоке (здесь стейт вполне может хранить текущую версию фичей). 

Размеры стейтов могут достигать сотен терабайт, а сроки существования и вовсе равны срокам жизни приложений, которые его используют.

Отказоустойчивость стейта Apache Flink

Продолжительный жизненный цикл стейта предъявляет особые требования к обеспечению его отказоустойчивости. Приложения на Apache Flink выполняются в кластере, состоящем из двух основных компонентов: Task Manager и Job Manager. Первый играет роль исполнителя и предоставляет свои ресурсы в виде потоков (слотов) для работы отдельных экземпляров операторов. Второй является своего рода координатором работы кластера, контролируя работу Task Manager’ов и управляя командами на сохранение стейта (и другими процессами за рамками тематики этой статьи). Task Manager’ы общаются между собой через TCP, передавая события между разными экземплярами операторов.

Процесс, в ходе которого выполняется периодическое сохранение (бэкап) стейта, называется checkpoint’ом. Ручной триггер для сохранения стейта называется savepoint’ом.

В рамках статьи мы не будем подробно рассматривать устройство кластера Apache Flink и его внутренние процессы. Важно понять, что такие события существуют и непосредственно связаны с обеспечением отказоустойчивости.

Реализация механизма резервирования стейта зависит от выбранного типа state backend и будет рассмотрена чуть ниже. На текущем этапе важно понять, что Task Manager периодически выполняет консистентный снэпшот стейта, который может быть сериализован и передан на длительное хранение. Наличие снэпшота позволяет относительно быстро десериализовать стейт в случае отказа и возобновить работу приложения с прежней точки.

Диаграмма процесса сохранения стейта представлена ниже:

Рисунок 1 – Процесс сохранения стейта
Рисунок 1 – Процесс сохранения стейта

Де/сериализация и эволюция

Apache Flink нативно поддерживает следующие типы с точки зрения де/сериализации:

  1. Все примитивные типы Java/Scala;

  2. Tuple’ы, Case-классы;

  3. POJO;

  4. AVRO.

Работа с другими типами отдается на откуп Kryo. Можно разрабатывать и собственные де/сериализаторы.

Ниже представлена таблица с результатами тестирования сериализации различных типов данных в Apache Flink 1.8:

Таблица 1 – Производительность сериализации

Serializer

Ops/s

PojoSerializer

305

RowSerializer

475

TupleSerializer

498

Kryo

102

Avro (Reflect API)

127

Avro (SpecificRecord API)

297

Protobuf (Kryo)

376

Thrift (Kryo)

129

Поддерживается эволюция схем стейта. На данный момент можно использовать только Avro и POJO. Наиболее удобным вариантом считается именно Avro за счет готовой спецификации в части описания процессов эволюции схемы. 

Для POJO все в той же степени прозрачно: можно добавлять и удалять поля, но нельзя менять имя класса и типы ранее созданных полей.

В планах развития – реализация поддержки эволюции схемы для tuple’ов, case-классов и форматов Protobuf/Thrift.

Из чего состоит память Task Manager

Task Manager, будучи главной боевой единицей кластера, использует выделенную ему память для хранения различных сегментов данных.

Часть памяти требуется для обслуживания ядра фреймворка (довольно малая), часть – уходит на обслуживание network buffer’ов, с помощью которых Task Manager общается со своими коллегами и “руководителем” в лице Job Manager (для этого используется Netty).

Но главный вопрос все же остается открытым – где хранить стейт? Предлагаю взглянуть на то, как Task Manager использует выделенную для него память:

Рисунок 2 – Распределение сегментов памяти
Рисунок 2 – Распределение сегментов памяти

Типы стейтов Apache Flink можно разделить на две группы:

  1. Timer state – стейты с информацией о запланированных операциях, например,завершение окон или callback’и функций процессинга;

  2. Keyed state – привычные user-defined стейты, которые используются при обработке потока событий.

Вариантов хранения стейта в Apache Flink всего два: HeapKeyedStateBackend и RocksDBStateBackend. Первый интерфейс реализуется двумя классами: MemoryStateBackend и FsStateBackend. Отличаются они только способом хранения снэпшотов, но об этом чуть позже.

Рисунок 3 – Классификация state backend’ов
Рисунок 3 – Классификация state backend’ов

Хранение стейта в Java Heap

При использовании MemoryStateBackend и FsStateBackend стейт хранится в структурах данных внутри хипа Java-машины. Распределение памяти внутри JVM принимает следующий вид (спойлер: Keyed State переезжает в “кучу”): 

Рисунок 4 – Распределение памяти при использовании HeapKeyedStateBackend
Рисунок 4 – Распределение памяти при использовании HeapKeyedStateBackend

В качестве структуры хранения данных используется hashmap, в которой создается вхождение на каждый зарегистрированный в приложении стейт. Этот вариант позволяет сохранять снэпшоты асинхронно, при этом механизмы де/сериализации используются только в момент сохранения или восстановления снэпшотов. Процессинг читает и пишет данные напрямую в память.

Плюсы подхода:

  1. Самая высокая скорость работы (все данные – в памяти, отсутствует де/сериализация при чтении и записи).

Недостатки:

  1. Сборщик мусора может оказывать существенное влияние на производительность;

  2. Структура хранения стейта в памяти имеет ощутимый оверхед по размеру;

  3. Размер стейта напрямую ограничен объемами доступной памяти.

Структура хранения “in-memory”

Как упоминалось ранее, объекты стейта хранятся в памяти в виде хэш-таблицы, в которой каждому бакету соответствует один из стейтов.

Структура каждого entry довольно объемная, поэтому суммарный оверхед хранения каждого объекта в стейте с учетом выравнивания может достигать 160-180 Байт:

Рисунок 5 – Структура хранения стейта in-memory
Рисунок 5 – Структура хранения стейта in-memory

Отличия между FsStateBackend и MemoryStateBackend

Как мы уже поняли, вариантов хранения стейта в «куче» всего два: MemoryStateBackend и FsStateBackend (см. рисунок 3).

Оба идентичны с точки зрения структуры хранения стейта в памяти. Отличие заключается в механизме обеспечения его отказоустойчивости.

MemoryStateBackend – это вариант, который отлично подойдет для локальной разработки, отладки или приложений, использующих очень малые объемы стейта и не требующих его отказоустойчивого восстановления. Этот подход прост и не требует внешнего хранилища.

Когда Job Manager кластера Apache Flink инициирует чекпоинт (то есть запускает механизм сохранения снепшота стейтов), каждый Task Manager кластера отправляет свою часть стейта в Job Manager, а тот, в свою очередь, сохраняет его в собственном хипе. В случае отказа одного из Task Manager’ов Job Manager передает ему снэпшот состояния для восстановления. Job Manager становится единой точкой отказа всего кластера (и даже поддержка HA-конфигурации с резервированием JM’ов в данном случае не поможет).

Рисунок 6 – Процесс сохранения снэпшотов MemoryStateBackend
Рисунок 6 – Процесс сохранения снэпшотов MemoryStateBackend

Подход имеет массу ограничений:

  1. Если узел Job Manager выйдет из строя, вы потеряете возможность восстановления стейта;

  2. Размер стейта по умолчанию для каждого Тask Мanager – 5 Мб. Его можно увеличить, передав соответствующий параметр конструктора при его создании;

  3. Так как Apache Flink во многом опирается на Akka, максимальный размер стейта Task Manager’а не должен превышать значение, указанное в параметре akka frame size. В противном случае вы получите ошибку в момент создания чекпоинта (стейт может не поместиться в указанный лимит при отправке снэпшота в Job Manager);

  4. Суммарный стейт всех Task Manager’ов ограничен объемом памяти Job Manager’а.

Несмотря на многочисленные предостережения, MemoryStateBackend вполне может использоваться в проде, нужно только внимательно отнестись к его слабым сторонам.

При использовании FsStateBackend, в отличие от предыдущего варианта, Task Manager’ы сохраняют снэпшоты стейта во внешнем хранилище. Результат, содержащий метаданные сохраненного снэпшота, передается Job Manager’у, который в итоге собирает общий чекпоинт всего приложения, сохраняя в памяти только часть метаданных.

При аварии Job Manager направляет Task Manager’ам команды для восстановления стейта на основе накопленных ранее метаданных. Сохраненный снэпшот десериализуется из хранилища и загружается в память Task Manager’ов. Работа приложения восстанавливается с последнего сохраненного чекпоинта.

Рисунок 7 – Процесс сохранения снэпшотов при использовании FsStateBackend
Рисунок 7 – Процесс сохранения снэпшотов при использовании FsStateBackend

FsStateBackend вполне можно было бы назвать “MemoryStateBackend на стероидах”. Он совмещает высокие скорости чтения и записи с отказоустойчивостью, но требует дополнительной инфраструктуры для хранения снэпшотов и вносит сетевые издержки при их передаче. Практика показывает, что такой вариант хранения стейта наиболее оправдан для сравнительно небольших объемов данных: до 5-8 Гб стейта на один Task Manager.

Рекомендации по увеличению производительности in-memory стейтов

Как и любое решение, HeapStateBackend и его наследники обладают рядом компромиссов и оптимизаций.

Общие советы по оптимизации:

  1. Следует выбирать де/сериализаторы с эффективными методами копирования. Это следствие используемой структуры хранения в памяти, в которой периодически требуется выполнять deep-копии объектов;

  2. Используйте иммутабельные объекты и структуры при формировании стейта;

  3. Преобразуйте POJO и case-классы в как можно более плоские представления. Это позволит снизить оверхед по памяти и обходу ссылок на объекты;

  4. Экспериментируйте со стратегиями GC, как упоминалось ранее. Здесь они могут иметь существенное влияние на скорость чтения/записи;

Практический совет: лучше развернуть четыре Task Manager’а по одному слоту в каждом, чем один Task Manager с четырьмя слотами. В этом случае мы получим четыре отдельные JVM и изолированные процессы GC, что минимизирует аффекты между соседними операторами во время чистки мусора. 

RocksDB в качестве state backend

Последний и самый многогранный вариант с точки зрения доступных компромиссов и оптимизации – использование RocksDB в качестве хранилища стейтов.

RocksDB обеспечивает высокую скорость записи за счет использования Log-structured merge (LSM)-деревьев. Ситуация с чтением несколько хуже, но применение так называемых MemTable и фильтра Блума позволяют достичь довольно высоких показателей. 

При работе в качестве хранилища стейтов RocksDB использует off-heap память JVM. В этом случае диаграмма распределения памяти внутри виртуальной машины принимает следующий вид:

Рисунок 8 – Распределение памяти при использовании RocksDbStateBackend
Рисунок 8 – Распределение памяти при использовании RocksDbStateBackend

Стейт хранится в сериализованном виде в off-heap, на диске или в локальном хранилище. Для каждого зарегистрированного стейта создается собственное семейство колонок RocksDB (колонку можно воспринимать как отдельную таблицу).

Хранение записей в формате ‘ключ-значение’ организовано в виде LSM-дерева. Ключом выступает сериализованный кортеж, состоящий из namespace’а (контекст), keygroup’ы (группа ключей – залог масштабируемости стейта в Apache Flink) и key (ключ вхождения стейта). К сожалению, данные де/сериализуются при каждом чтении и записи, даже если запрашиваемый ключ еще находится в памяти.

Плюсы подхода:

  1. Отсутствует негативное воздействие со стороны GC, так как работа ведется в off-heap;

  2. Довольно компактное представление данных с точки зрения хранения (малый оверхед по объему);

  3. Поддерживаются инкрементальные чекпоинты (фиксируются только дельты изменений, а не весь стейт);

  4. Размер стейта ограничен только доступными объемами дискового пространства.

Недостатки подхода:

  1. Значительно более низкая скорость чтения/записи по сравнению с in-memory хранением (до 10 и более раз медленнее). 

Единственный и самый существенный недостаток подхода – скорость работы. Запросы на чтение/запись измеряются сотнями миллесекунд, вместо десятков.

Каждое чтение или запись в стейт требует де/сериализации, пересекая границу JNI на пути к колонкам RocksDB.

Экскурс в архитектуру RocksDB (в проекции на Apache Flink)

Данные в RocksDB записываются в виде пар ключ-значение, которые хранятся в виде сериализованных массивов байт. Все данные представлены в отсортированном виде.

Write path

Столпами архитектуры RocksDB являются logfile, SST (Sorted String Table) и Memtable. Все записи сперва попадают в active memtable, которая является in-memory структурой (этим и обусловлена высокая скорость записи в RocksDB – фактически она выполняется напрямую в память). Параллельно запись может вестись в своего рода аналог WAL - logfile, но этот функционал в Apache Flink намерено отключен, так как сохранность состояния в случае отказов здесь обеспечивается механизмом чекпоинтов.

Как только memtable достигает определенного размера, она становится сначала read-only memtable, а затем записывается в SST-файл, который сохраняется в локальном хранилище. Когда active memtable становится read-only, новые записи сохраняются в новой active memtable.

SST-файлы подвержены периодической компакции, которая в целом похожа на механизм работы таковой в Apache Kafka: иммутабельные SST-файлы постепенно объединяются в фоновом процессе, образуя более крупные структуры.

Read path

При появлении запроса на чтение данных из RocksDB, в первую очередь, проверяется содержимое active memtable на предмет наличия запрашиваемого ключа. Если ключ найден, то результат возвращается клиенту.

Если запрашиваемый ключ не найден, выполняется поиск в read-only memtable. Так как read-only memtable может быть несколько, поиск выполняется в порядке от новых к старым. Если ключ найден, то результат возвращается клиенту.

Если ключ снова не удалось отыскать ни в одной из memtable, то выполняется поиск в SST (как мы помним, они хранятся на диске). В этом случае возможно применение фильтра Блума. Но используйте эту опцию с осторожностью, так как она легко может поглотить лишние пару гигабайт драгоценной памяти.

Отдельно стоит упомянуть так называемый read-only block cache. Это in-memory структура данных, которая хранит часто запрашиваемые ключи в виде несжатых SST. Read-only block cache используется только для чтения и помогает избежать лишних обращений к диску для декомпрессии и поиска ключей в SST.

На практике, при достаточном объеме off-heap, значительная часть чтений может выполняться из памяти.

Рисунок 9 – Процессы чтения и записи в RocksDB
Рисунок 9 – Процессы чтения и записи в RocksDB

Обеспечение отказоустойчивости стейта при работе с RocksDB

Чтобы обеспечить восстановление стейта приложения в случае отказов, при наступлении чекпоинта, когда Job Manager кластера Apache Flink отправляет Task Manager’ам команду на сохранение консистентных снэпшотов, SST-файлы копируются из локального диска в удаленное, распределенное, отказоустойчивое хранилище, например, HDFS или S3 (в обоих случаях используется коннектор Presto).

При сбое Job Manager дает команду на восстановление стейта Task Manager’ам, сообщая, в том числе, и пути к SST-файлам. Фактически SST копируются обратно в локальное хранилище RocksDB и работа приложения восстанавливается с последнего сохраненного снэпшота.

Рисунок 10 –  Восстановление стейта при использовании RocksDB
Рисунок 10 – Восстановление стейта при использовании RocksDB

Рекомендации по увеличению производительности RocksDB в Apache Flink

С точки зрения тюнинга RocksDB

  1. Используйте локальные диски для хранения SST. RocksDB умеет гибко конфигурироваться с учетом используемых типов дисков (HDD/SSD). Очень плохая идея – использовать EBS/СХД. Вы легко можете нагрузить сеть до очень высоких значений (см. The Impact of Disks on RocksDB State Backend in Flink: A Case Study);

  2. Экспериментируйте с размером read-only block cache (параметр block_cache_size), обменивая память на скорость чтения;

  3. Экспериментируйте с размером active memtable (параметр write_buffer_size) с аналогичными целями + скорость записи;

  4. Меняйте максимальное количество memtable, по достижении которого они сохраняются в SST (параметр max_write_buffer_number);

  5. Пробуйте фильтр Блума, но помните о дополнительном (5-10% от общего объема) оверхеде по памяти.

С точки зрения кода

  1. Старайтесь использовать максимально быстрые де/сериализаторы и форматы хранения данных в стейте (см. бенчмарк в таблице 1);

  2. Не используйте Map и List, вложенные в стейт типа ValueState. Старайтесь заменять любые вложенные структуры данных на MapState и ListState, предоставляемые фреймворком. В противном случае, при каждом чтении будет выполняться десериализация всех вхождений вашей коллекции, что может нанести огромный по своим масштабам урон производительности;

  3. Анализируйте метрики RocksDB;

  4. Еще раз – не используйте EBS или СХД!

RocksDbStateBackend отлично подходит для работы со стейтами большого объема: от десятков гигабайт до сотен и тысяч терабайт.

Timer State

Ранее мы упомянули о существовании обособленного типа стейта для таймеров. Таймеры – это компоненты, которые используются для выполнения запланированных действий (закрытие окон, callback’ов и т. д.). Исторически они могли храниться только в памяти, но несколько релизов назад для них добавили поддержку RocksDB. 

В памяти таймеры хранятся в массиве, сопровождаемом дополнительной хэш-таблицей-спутником, призванной снизить сложность выполнения операций peek/poll/contains и delete. Каждая запись массива содержит по две ссылки на объекты: key (ключ текущего контекста), namespace, одно поле типа Long, в котором хранится timestamp срабатывания таймера, и одно поле типа Int для хранения индекса в массиве.

Массив работает в паре с хэш-таблицей-спутником, хранящей пары типа <Timer; Timer>, в которой каждое вхождение связано ссылкой на ключ текущего контекста из массива. Вставка и удаление таймеров выполняются с логарифмической сложностью, определение наличия элемента в массиве – за константу, как и его извлечение.

Рисунок 11 – Структура хранения таймеров in-memory
Рисунок 11 – Структура хранения таймеров in-memory

В памяти всегда хранится только один таймер на один timestamp вне зависимости от количества связанных с ним ключей. Сделано это намеренно, так как в ранних версиях Apache Flink пользователи часто создавали таймеры с гранулярностью до миллисекунды, а контекстом для каждого таймера являлся ключ стейта. Создавалась ситуация многократного дублирования таймеров с одним и тем же timestamp в контексте различных ключей, что приводило к нерациональному использованию памяти. Тем не менее полезная рекомендация: использовать гранулярность таймеров не менее секунды при выборе in-memory хранения. 

С недавних пор таймеры можно хранить и в RocksDB. В этом случае в базе данных создается отдельная колонка, ключом которой является timestamp семейства таймеров. Рекомендуемую гранулярность в этом случае можно снизить до сотен или даже десятков миллисекунд, если это требуется с точки зрения бизнес-задачи.

Таймеры, которые должны сработать в ближайшее время, заранее подтягиваются в память, чтобы гарантировать, что сработка будет выполнена вовремя и задержка чтения из RocksDB не сдвинет момент выполнения таймера на пару миллисекунд. 

Выводы

Работа со стейтом в Apache Flink не только удобная, но еще и быстрая, надежная. Соблюдая рекомендации, приведенные в этой статье, можно гибко управлять стейтом размером от нескольких мегабайт до сотен и тысяч терабайт с гарантированно высокой скоростью и отказоустойчивостью.

Если ваше приложение не требует больших объемов хранения и не привередливо к отказоустойчивому восстановлению, смело выбирайте MemoryStateBackend.

Хотите иметь все преимущества чтения и записи напрямую в память для стейтов средних размеров? Тогда FsStateBackend станет отличным вариантом.

Ну, а если вы – промышленный гигант потоковой обработки данных и накопили стейт размером в сотни и даже тысячи терабайт, посмотрите в сторону RocksDBStateBackend и не забудьте уделить должное внимание ее оптимизации.

Комментарии (2)


  1. derikn_mike
    19.08.2021 21:11

    не увидел про global state , он в флинте есть?

    например в kafka stream даже если делаешь 10 приложений на 10 партиций с глобал стейтом и запросом

    "select count(*) from myTopic"


    1. AlexSergeenko Автор
      19.08.2021 21:47

      Верно, аналог есть и в Apache Flink, только называется иначе -

      Operator State: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#operator-state

      Обладает довольно узкими сценариями использования, оттого не рассматривался.