Привет, Хаброжители!

Сейчас, когда данные генерируются непрерывно и в огромных объемах, умение эффективно обрабатывать события в реальном времени становится критически важным навыком для разработчиков. Книга от Билла Беджека — это подробное руководство по созданию мощных приложений на основе Apache Kafka, одной из самых надежных и популярных платформ для потоковой обработки данных.

Автор, опытный инженер и участник проекта Apache Kafka, предлагает читателям практический подход к освоению Kafka Streams и других компонентов экосистемы Kafka. В книге рассматриваются не только основы, но и продвинутые техники, включая интеграцию с Kafka Connect, управление схемами через Schema Registry, работу с ksqlDB и тестирование потоковых приложений.
Об авторе
Билл Беджек (Bill Bejeck) — участник проекта и член консультационного совета Apache Kafka®. Более 20 лет проработал инженером-программистом. В настоящее время — инженер DevX в компании Confluent, а до этого более трех лет был инженером в команде Kafka Streams. До Confluent он работал над созданием различных приложений для правительства США и использовал распределенное программное обеспечение, такое как Apache Kafka, Spark и Hadoop. Кроме того, он регулярно ведет блог “Беспорядочные размышления о написании кода”.

О чем эта книга?


Корпоративные приложения ежедневно сталкиваются с необходимостью обрабатывать миллионы событий, и Kafka Streams предоставляет для этого интуитивно понятный и надежный инструментарий. Второе издание книги дополнено новыми материалами, охватывающими более широкий спектр архитектур потоковой обработки и интеграцию данных через Kafka Connect.

Билл Беджек написал эту книгу, представляя себя в роли напарника по программированию: он шаг за шагом ведет читателя от основ Kafka до сложных сценариев использования Kafka Streams. Книга подойдет как новичкам, так и опытным разработчикам, желающим углубить свои знания в потоковой обработке данных от создания простого приложения через добавление нового функционала к приложению потоковой передачи событий.

Эта книга охватывает всю платформу потоковой передачи событий Kafka.

Для кого эта книга?


Книга адресована разработчикам, которые хотят освоить потоковую обработку событий с помощью Kafka Streams. Хотя знание распределенного программирования и основ Kafka будет полезным, автор подробно объясняет все необходимые концепции, что делает материал доступным даже для тех, кто только начинает работать с Kafka.

Java-разработчики среднего и высокого уровня, знакомые с лямбда-выражениями и сериализацией, найдут здесь ценные примеры и рекомендации по созданию эффективных приложений. Исходный код в книге написан на Java 17.

Структура книги


Книга состоит из трех частей, каждая из которых последовательно раскрывает ключевые аспекты работы с Kafka и Kafka Streams.

Часть I: Основы потоковой передачи событий и описание различных частей экосистемы Kafka.

Первая часть книги знакомит читателя с экосистемой Kafka, объясняя, как работают брокеры, топики, разделы и другие ключевые компоненты.
  • Глава 1 посвящена тому, как и почему потоковая обработка стала необходимым элементом широкомасштабной обработки данных в режиме реального времени. В ней также приводится ментальная модель различных компонентов: брокеров, клиентов, Kafka Connect, Schema Registry и, конечно же, Kafka Streams.
  • Глава 2 — это введение для тех, кто еще не работал с Kafka. Здесь объясняются базовые концепции: брокеры, топики, говорится о некоторых аспектах мониторинга.
Часть II: Передача данных и управление схемами

Во второй части рассматриваются инструменты для работы с данными в Kafka: Schema Registry, клиенты Producer и Consumer, а также Kafka Connect.
  • Глава 3 рассказывает о Schema Registry — компоненте, помогающем управлять эволюцией схем данных.
  • Глава 4 посвящена клиентам Kafka (производители и потребители), которые служат строительными блоками для для Kafka Streams и Kafka Connect.
  • Глава 5 раскрывает возможности Kafka Connect получать данные в Kafka через коннекторы-источники и экспортировать их во внешние системы с помощью коннекторов-приемников.
Часть III: Разработка приложений Kafka Streams

Третья часть — самая важная в книге. Здесь автор подробно охватывает разработку приложений Kafka Streams, знакомит с ksqlDB, тестированием приложения потоковой передачи событий и интеграцией с Spring Framework.
  • Глава 6 — введение в Kafka Streams с примерами приложений. Попутно вы познакомитесь с Kafka Streams DSL.
  • Глава 7 объясняет, как состояние используется в потоковых приложениях.
  • Глава 8 знакомит с интерфейсом KTable — потоком для обновления записей.
  • Глава 9 посвящена оконным операциям и отметкам времени.
  • Глава 10 раскрывает Processor API для более точного управления.
  • Глава 11 рассказывает о ksqlDB — инструменте, позволяющем писать приложения потоковой передачи событий без программного кода, а исключительно на SQL.
  • Глава 12 объясняет интеграцию Kafka с Spring Framework.
  • Глава 13 знакомит с Interactive Queries (IQ).
  • Глава 14 посвящена тестированию приложений Kafka Streams.
«Kafka Streams в действии» — это практическое руководство для разработчиков, которые хотят освоить потоковую обработку событий с помощью Kafka. Второе издание дополнено новыми примерами и актуальными темами, что делает его незаменимым ресурсом для всех, кто работает с большими данными и микросервисами.
Предлагаем ознакомиться с отрывком «KTable: поток обновлений»
Чтобы усвоить концепцию потока обновлений, полезно сравнить его с потоком событий и рассмотреть различия. Разберем конкретный пример отслеживания обновлений цен на акции (рис. 8.1).

image

Рис. 8.1. Диаграмма неограниченного потока котировок акций

Как видите, каждая котировка акций представляет собой дискретное событие, они не связаны друг с другом. Даже если за несколько котировок отвечает одна компания, они все равно рассматриваются по отдельности. Такое представление событий соответствует потоку данных событий, описываемому KStream.

Взглянем теперь, как эта концепция соотносится с таблицами базы данных. Каждая запись представляет результат операции вставки в таблицу, в которой роль первичного ключа играет число, монотонно увеличивающееся в каждой операции вставки, как показано в таблице котировок акций на рис. 8.2.

Далее обратимся снова к потоку записей. Поскольку записи автономны, данный поток соответствует операциям вставки в таблицу. На рис. 8.3 эти две концепции объединены в целях иллюстрации.

image

Рис. 8.2. Простая таблица базы данных с курсом акций различных компаний. В ней есть столбец с ключом, а также другие столбцы со значениями. Ее строки можно рассматривать как пары «ключ — значение», если «свалить» все остальные столбцы в контейнер «значение»

image

Рис. 8.3. Поток индивидуальных событий по сравнению со вставками в таблицу базы данных. Аналогично можно представить построчную потоковую обработку таблицы

Самое главное здесь — возможность рассматривать поток событий аналогично последовательности операций вставки в таблицу, благодаря чему можно лучше разобраться в использовании потоков данных для работы с событиями. Следующий этап — изучение случая, когда события в потоке взаимосвязаны.

Обновления записей (журнал изменений)


Допустим, вы решили отслеживать поведение покупателей, для чего получаете поток транзакций покупателя, но теперь отслеживаете их действия в различные моменты времени. Если добавить ключ — идентификатор покупателя, то можно связать события покупки друг с другом и получить поток обновлений вместо потока событий.

Если поток событий мы сравнивали с журналом, то поток обновлений можно сравнить с журналом изменений. Рисунок 8.4 иллюстрирует эту концепцию.

image

Рис. 8.4. В журнале изменений каждая входящая запись заменяет предыдущую запись с тем же ключом (если таковая есть). В потоке записей у нас имеется четыре события, а в потоке обновлений (в журнале изменений) их только два

На этом рисунке видна взаимосвязь между потоком обновлений и таблицей базы данных. Как журнал (log), так и журнал изменений (changelog) отражают добавляемые в конец файла входящие записи. В журнале видны все записи, а в журнале изменений — только последняя запись для каждого ключа.

ПРИМЕЧАНИЕ

Как в журнале, так и в журнале изменений записи при поступлении добавляются в конец файла. Различие в том, что журнал используется, когда нужно видеть все записи, а журнал изменений — только когда нужно видеть последнюю запись для каждого ключа.

Для сокращения журнала с сохранением последних записей для всех ключей можно воспользоваться сжатием журналов, обсуждавшимся в главе 2. Результат сжатия журнала показан на рис. 8.5. Раз нас интересуют только последние значения, то можно удалить более старые пары «ключ — значение».

image

Рис. 8.5. Слева показан журнал до сжатия, в котором можно заметить повторяющиеся ключи с разными значениями — обновления. Справа показан журнал после сжатия — для каждого ключа оставлено только последнее значение, и размер журнала за счет этого уменьшился

ПРИМЕЧАНИЕ

Информация для этого раздела взята из статей Джея Крепса (Jay Kreps) Introducing Kafka Streams: Stream Processing Made Simple (http://mng.bz/49HO) и The Log: What Every Software Engineer Should Know About Real-time Data’s Unifying Abstraction (http://mng.bz/eE3w).

Вы уже знакомы с потоками событий по работе с KStream. Следующий этап, после того как мы разобрались во взаимосвязи между потоками и таблицами, — сравнение потока событий с потоком обновлений. Для представления журналов изменений (потоков обновлений) мы будем использовать абстракцию KTable.

KStream и KTable в действии


Давайте сравним KStream и KTable. Для этого запустим простое приложение слежения за котировками акций. KStream и KTable будут читать и выводить записи в консоль с помощью метода print(). Приложение слежения за котировками выполнит три итерации и создаст девять записей.

ПРИМЕЧАНИЕ

В KTable нет таких методов, как print() и peek(), поэтому для вывода записей нужно преобразовать KTable из потока обновлений в поток событий, используя метод toStream().

В листинге 8.1 показан пример программы, которая выводит события изменения котировок акций в консоль (находится в src/main/java/bbejeck/chapter_8/KStreamVsKTableExample.java). Исходный код можно найти по адресу github.com/bbejeck/KafkaStreamsInAction2ndEdition.

Листинг 8.1. Вывод в консоль KTable и KStream

image

ИСПОЛЬЗОВАНИЕ ОБЪЕКТОВ SERDE ПО УМОЛЧАНИЮ
При создании KTable и KStream мы не задавали никаких объектов Serde. А равно и при обоих вызовах метода print(). А все благодаря тому, что мы внесли в конфигурацию объекты Serde для использования по умолчанию, примерно вот так:

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
StreamsSerdes.StockTickerSerde().getClass().getName());

Если же вы хотели бы использовать другие типы данных, то для чтения или внесения записей необходимо передать объекты Serde через перегруженные методы.

С точки зрения объекта KTable он получил не девять отдельных записей, а три исходных записи и две порции обновлений и вывел в консоль только последнюю их порцию. Обратите внимание, что записи в KTable совпадают с последними тремя записями, опубликованными KStream. Мы обсудим механизмы, с помощью которых KTable выводит только обновления, в следующем разделе.

Главный вывод: записи с одинаковыми ключами в потоке данных по своей сути обновления, а не самостоятельные новые записи (рис. 8.6). Именно понятие потока обновлений лежит в основе интерфейса KTable.

image

Рис. 8.6. Вывод записей с одинаковыми ключами: KTable по сравнению с KStream

ОБЪЕКТЫ KTABLE ИМЕЮТ ХРАНИМОЕ СОСТОЯНИЕ


В предыдущем примере, когда мы создали таблицу с помощью builder.table(), библиотека Kafka Streams также создала хранилище StateStore для отслеживания состояния, которое по умолчанию является постоянным. Поскольку хранилища состояний работают только с массивами байтов, нам нужно передать экземпляры Serde, чтобы хранилище могло (де)сериализовать ключи и значения. Конкретные экземпляры Serde мы передаем потоку событий с помощью конфигурационного объекта Consumed, и точно так же можно поступить при создании KTable:

builder.table(STOCK_TICKER_TABLE_TOPIC,
              Consumed.with(Serdes.String(),
                           StockTradeSerde()));

Теперь хранилище состояний будет использовать объекты Serde, которые мы передали в объекте Consumed. Перегруженная версия StreamsBuilder.table также принимает экземпляр Materialized, что позволяет настроить тип хранилища и задать его имя, чтобы обеспечить доступность для запросов. Интерактивные запросы мы обсудим в главе 13.

Можно также создать KTable напрямую с помощью метода KStream.toTable. Этот метод меняет интерпретацию записей, после вызова они будут рассматриваться не как события, а как обновления. Можно также использовать метод KTable.toStream для преобразования потока обновлений в поток событий. Последнее преобразование мы обсудим в следующем разделе, когда будем рассматривать KTable. Главное, что следует запомнить: KTable создается непосредственно из топика, а Kafka Streams автоматически добавляет хранилище состояний, поддерживающее KTable.

Выше я рассказал, как KTable обрабатывает операции вставки и обновления, а как удалить запись? Чтобы удалить запись из KTable, нужно отправить пару «ключ — значение» со значением null. Значение null действует как маркер, который на сленге Kafka Streams называют «надгробием», и в конечном счете удаляется из хранилища состояний и топика журналирования изменений, а следовательно, и из таблицы.

Как и KStream, KTable распределяется по задачам, число которых определяется количеством разделов в исходном топике. Это распределение по задачам означает, что записи в таблице могут находиться в разных экземплярах приложения.

KTABLE


В KTable есть методы, похожие на методы в KStream: filter, filterNot, mapValues и transformValues. Они тоже поддерживают возможность объединения в цепочки, возвращая новый экземпляр KTable.

Функционально эти методы очень похожи на аналогичные методы в KStream, но имеют некоторые отличия: к парам «ключ — значение», в которых значение равно null, они применяют семантику удаления.

Вот как семантика удаления влияет на работу KTable.
  1. Если входная пара «ключ — значение» содержит значение null, то узел-обработчик не выполняет с ней никаких действий, а просто пересылает ее в новую таблицу как маркер «надгробия».
  2. В методах filter и filterNot, если запись не соответствует предикату, она заменяется маркером «надгробия», который пересылается в новую таблицу.
Пример вы найдете в KTableFilterExample в пакете bbejeck.chapter_8. Он применяет KTable.filter к потоку записей, часть из которых имеет значения null, и отфильтровывает некоторые значения, не равные null. Но мы уже обсуждали фильтрацию ранее, поэтому я не буду рассматривать этот пример здесь и дам вам возможность сделать это самостоятельно.

Теперь обсудим агрегирование и соединение с помощью KTable.

АГРЕГИРОВАНИЕ С ПОМОЩЬЮ KTABLE


Агрегирование в KTable действует иначе, чем в KStream. Давайте исследуем имеющиеся различия на примере. Представьте, что мы создаем приложение для отслеживания цен на акции. Для любой акции нас интересует только последняя цена, поэтому имеет смысл использовать KTable. Кроме того, мы хотели бы отслеживать динамику работы различных сегментов рынка. Например, мы могли бы сгруппировать акции Google, Apple и Confluent в сегмент рынка технологий. Поэтому нам понадобится выполнить агрегирование и сгруппировать различные акции по сегменту рынка. Реализация агрегирования с помощью KTable будет выглядеть так, как показано в листинге 8.2.

Листинг 8.2. Агрегирование с помощью KTable

image

Мы создаем KTable, выполняем groupBy и изменяем ключ, используя сегмент рынка, что приведет к перераспределению. Исходный ключ — это символ акции, поэтому разные акции из данного сегмента рынка могут оказаться в разных разделах.

Но это требование скрывается за тем фактом, что при агрегировании с помощью KTable всегда нужно выполнять операцию groupBy. Почему? Напомню, что в KTable входной ключ считается первичным ключом. Как и в реляционной базе данных, группировка по первичному ключу всегда приводит к получению единственной записи. Поэтому записи нужно группировать по другому полю — комбинирование первичного ключа и группирующих полей даст результаты, подходящие для агрегирования. Как и в KStream, вызов метода KTable.groupBy возвращает промежуточную таблицу — KGroupedTable, которая используется для вызова метода aggregate.

Есть еще одно отличие. Метод aggregate в KTable, как и в KStream, принимает в первом параметре экземпляр Initializer, задающий значение по умолчанию для первой операции агрегирования. Но далее он принимает два агрегатора, один из которых добавляет новое значение в агрегат, а другой вычитает из агрегата старое значение предыдущей записи с тем же ключом. Взгляните на рис. 8.7, чтобы понять, что происходит.

image

Рис. 8.7. Операция агрегирования в KTable использует два агрегатора: суммирующий и вычитающий

Вот еще один взгляд на агрегирование. Выполняя то же самое в реляционной таблице и суммируя значения в записях, отобранных группировкой, мы бы получили только единственное последнее значение на запись. Например, SQL-эквивалент показанного выше агрегирования в KTable мог бы выглядеть как в листинге 8.3.

Листинг 8.3. SQL-эквивалент агрегирования в KTable

SELECT market_segment,
       sum(share_volume) as total_shares,
       sum(share_price * share_volume) as dollar_volume
       FROM stock_alerts
       GROUP BY market_segment;

Когда поступает новая запись, первым делом обновляется таблица оповещений. Затем запускается агрегирующий запрос, чтобы извлечь обновленную информацию. Это в точности описывает работу KTable. Новая входная запись обновляет таблицу stock_alerts, и она передается для агрегирования. В обновлении для каждой акции может быть только одна запись, поэтому производится добавление в агрегат значения из новой записи и удаление значения из предыдущей записи.

Кому-то процесс может показаться сложным для полного понимания, поэтому поясню его несколькими иллюстрациями. Представим, что были получены некоторые записи и запустился расчет некоторых агрегатов (рис. 8.8).

image

Рис. 8.8. Записи поступают в объект KTable, который вычисляет агрегат для каждой из них

Операция агрегирования суммирует количество акций в каждой транзакции и сумму торгов в долларах, которая вычисляется путем умножения цены акции на количество. Затем происходит новая сделка с акциями (рис. 8.9).

Информация о сделке, включающей акции CFLT (компании Confluent), поступает в исходный топик и передается в KTable. Поскольку для CFLT нет предыдущей записи, выполняется только вставка записи в исходную таблицу. Так как компания Confluent относится к технологическому сегменту, необходимо обновить этот агрегат (рис. 8.10).

Для обновления агрегата в поле share_volume добавляется количество проданных акций и увеличивается поле dollar_volume путем умножения количества акций на значение в поле со стоимостью одной акции. Ситуация становится интереснее, когда поступает информация еще об одной сделке с акциями CFLT. Взгляните на рис. 8.11.

image

Рис. 8.9. Появляется информация о новой сделке, и обновляется исходный объект KTable

image

Рис. 8.10. С вновь полученной записью обновляется агрегат

image

Рис. 8.11. Появляется информация еще об одной сделке с акциями CFLT, и запускается серия событий обновления

Поскольку в KTable уже есть запись для акций CFLT, естественной мыслью является обновление ключа CFLT новым значением. Но сначала библиотека Kafka Streams должна получить предыдущую запись из хранилища состояний и сохранить ее в переменной, а уже потом добавлять новое значение в таблицу. Мы должны сохранить последнее значение, потому что оно необходимо для обновления агрегата далее в потоке.

В каждом сегменте рынка в результатах агрегирования каждой акции соответствует одна запись. Когда в сегмент поступает новая запись, выполняется двухэтапный процесс обновления. Во-первых, из агрегата вычитается предыдущая и прибавляется новая запись. Дополнительно из-за того, что ключ, возвращаемый функцией groupBy, мог измениться, Kafka Streams будет пересылать старые и новые значения отдельно. Обобщенная схема всего процесса обновления агрегата в KTable показана на рис. 8.12.

image

Рис. 8.12. Обобщенная схема всего процесса обновления агрегата в KTable

Теперь, увидев, как работает агрегирование в KTable, рассмотрим экземпляры Aggregator. Но, так как мы уже рассматривали их в главе 7, исследуем только логику суммирования и вычитания. Имейте в виду, что, несмотря на простоту примера, основные принципы, которые он демонстрирует, применимы к любой операции агрегирования в KTable. Начнем с суммирования (некоторые детали опущены для простоты) (листинг 8.4).

Листинг 8.4. Логика суммирования в Aggregator

image

Как видите, суммирование имеет простую логику работы: из последнего события StockAlert извлекается количество акций и прибавляется к текущему агрегату. То же самое делается с суммой торгов в долларах (после вычисления путем умножения количества акций на цену одной акции).

ПРИМЕЧАНИЕ

Объекты Protobuf неизменяемы, поэтому при обновлении значений для каждого объекта необходимо создавать новые экземпляры, используя сгенерированный конструктор.

Теперь перейдем к логике вычитания. Как вы наверняка догадались, она выполняет обратную операцию — вычитает те же значения и результаты вычислений для предыдущей записи с информацией об акциях той же компании в данном сегменте рынка. Поскольку сигнатура не изменилась, я покажу только вычисления (некоторые детали опущены для простоты) (листинг 8.5).

Листинг 8.5. Логика вычитания в Aggregator

image

Логика проста: мы вычитаем значения, полученные в StockAlert, которые Kafka Streams заменила в агрегате.

Итак, как мы выяснили, операция агрегирования в KTable сохраняет только последнее значение для каждой уникальной комбинации исходного ключа KTable и ключа, используемого для группировки. Здесь стоит отметить, что KTable также предоставляет методы reduce и count, при работе с которыми вы будете выполнять аналогичные шаги: сначала вызывать groupBy, а в вызов reduce передавать реализацию Reducer для суммирующего и вычитающего агрегатора. Я не буду рассматривать их здесь, чтобы не повторяться, но в исходном коде примеров для книги вы сможете увидеть, как используются оба этих метода, reduce и count.

На этом мы завершаем знакомство с KTable. Но прежде, чем перейти к более сложным операциям, я хотел бы рассмотреть еще одну абстракцию таблиц — GlobalKTable.

GLOBALKTABLE


Я уже упоминал GlobalKTable выше в этой главе, когда говорил, что таблица KTable делится на разделы и, следовательно, распределена между экземплярами приложения Kafka Streams (конечно, с тем же идентификатором приложения). Другими словами, KTable содержит только записи из одного раздела топика. Уникальность GlobalKTable в том, что она потребляет все данные из исходного топика. Это означает, что в таблице содержится полная копия всех записей из всех экземпляров приложения. Взгляните на рис. 8.13, иллюстрирующий это.

image

Рис. 8.13. GlobalKTable содержит все записи в топике из всех экземпляров приложения

Как видите, исходный топик для KTable имеет три раздела, и в каждом из трех экземпляров приложения объект KTable потребляет записи только из одного раздела. А объект GlobalKTable, напротив, имеет полную копию содержимого всех трех разделов исходного топика в каждом экземпляре. Kafka Streams материализует GlobalKTable на локальном диске в KeyValueStore, но не создает топик журналирования изменений, потому что исходный топик служит резервной копией для восстановления. Код в листинге 8.6 показывает, как создать объект GlobalKTable.

Листинг 8.6. Создание GlobalKTable

StreamsBuilder builder = new StreamsBuilder();
GlobalKTable<String, String> globalTable =
  builder.globalTable("topic",
                      Consumed.with(Serdes.String(),
                                   Serdes.String()));

GlobalKTable не предлагает API. Поэтому возникает естественный вопрос: когда использовать GlobalKTable, а когда KTable? GlobalKTable особенно удобно использовать для распространения информации по всем экземплярам Kafka Streams с целью использования ее в соединениях. Например, представьте, что у нас есть поток покупок с идентификатором пользователя. С помощью последовательности символов и цифр, представляющих человека, совершившего покупку, можно извлечь кое-какие сведения.

Но если у нас будет возможность добавить имя, адрес, возраст, род занятий и т. д., то мы получим больше информации о событиях. Поскольку информация о пользователях меняется нечасто (то есть люди не меняют работу или адрес еженедельно), GlobalKTable хорошо подходит для распространения достаточно статичных данных. Каждая таблица имеет полную копию данных, поэтому она лучше всего подходит для обогащения потока событий.

Еще одно преимущество GlobalKTable, обусловленное потреблением всех разделов исходного топика, заключается в том, что при выполнении соединения с KStream ключи не обязательно должны совпадать. Для создания соединения можно использовать значение из потока. Взгляните на рис. 8.14, чтобы понять, как это работает.

image

Рис. 8.14. Поскольку GlobalKTable материализует все разделы исходного топика, для создания соединения можно использовать значение из потока

Поскольку поток содержит значения, соответствующие ключам таблицы, а таблица содержит данные из всех разделов, вы можете извлечь информацию, необходимую для создания соединения, из значения таблицы. Как это реализовать и обогатить поток, я расскажу в подразделе 8.6.3, где мы рассмотрим соединение KStream–GlobalKTable.

СОЕДИНЕНИЕ С ТАБЛИЦАМИ


В предыдущей главе мы познакомились с операцией соединения двух объектов KStream, однако Kafka Streams поддерживает также соединения KStreamKTable, KStreamGlobalKTable и KTableKTable. Но зачем может понадобиться вычислять соединение между потоком и таблицей? Соединения «поток — таблица» дают прекрасную возможность обогащения событий дополнительной информацией. Чтобы иметь возможность соединения «поток — таблица» и «таблица — таблица», обе стороны должны иметь совместимое секционирование, то есть базовые исходные топики должны иметь одинаковое количество разделов. Схема на рис. 8.15 поможет вам понять, как выглядит совместимое секционирование на уровне топиков.

image

Рис. 8.15. Топики с совместимым секционированием имеют одинаковое количество разделов, поэтому экземпляры KStream и KTable будут иметь задачи, работающие с одним и тем же разделом

Как видите, это совершенно разные топики, просто имеющие одинаковое количество разделов. Теперь рассмотрим пару иллюстраций, чтобы понять, зачем может понадобиться выполнять соединение. Начнем с положительного примера на рис. 8.16.

image

Рис. 8.16. Топики с разным количеством разделов размещают одни и те же ключи в разных разделах

Судя по иллюстрации, оба ключа, по которым осуществляется соединение, идентичны. В результате оба попадут на раздел 0, поэтому в данном случае соединение возможно. Далее рассмотрим отрицательный случай на рис. 8.17.

image

Рис. 8.17. Топики с разным количеством разделов размещают одни и те же ключи в разных разделах

Даже притом что ключи идентичны, из-за разного количества разделов записи с одинаковыми ключами окажутся в разных разделах, то есть операция соединения окажется невозможной. Однако это не означает, что ничего нельзя сделать, чтобы обеспечить возможность соединения. Если у вас есть экземпляры KStream и KTable, для которых нужно выполнить соединение, но они имеют несовместимое секционирование, то вам нужно выполнить операцию перераспределения. Мы рассмотрим пример, как это сделать, в подразделе 8.6.1. Обратите внимание, что, поскольку GlobalKTable имеет полную копию записей, для вычисления соединения между потоком и глобальной таблицей не требуется иметь совместимое секционирование.

Познакомившись с требованиями к секционированию, рассмотрим конкретный пример, почему может понадобиться выполнить соединение потока с таблицей. Представим, что у нас есть поток событий щелчков кнопкой мыши, выполненных пользователем на сайте, а также таблица текущих пользователей, выполнивших вход. Объект события щелчка кнопкой мыши содержит только идентификатор пользователя и ссылку на страницу, куда выполнялся переход, но нам хотелось бы иметь дополнительную информацию. Для этого можно выполнить соединение потока щелчков кнопкой мыши с таблицей пользователей, и мы сможем в режиме реального времени получить гораздо больше полезной информации о закономерностях в действиях пользователей нашего сайта. Пример реализации такого соединения показан в листинге 8.7.

Листинг 8.7. Соединение «поток — таблица» для обогащения потока событий дополнительной информацией

KStream<String, ClickEvent> clickEventKStream =
                builder.stream("click-events",
                        Consumed.with(stringSerde, clickEventSerde));

KTable<String, User> userTable =
        builder.table("users",
                Consumed.with(stringSerde, userSerde));

clickEventKStream.join(userTable, clickEventJoiner)
        .peek(printKV("stream-table-join"))
        .to("stream-table-join",
                Produced.with(stringSerde, stringSerde));

Код в этом примере создает поток событий щелчков кнопкой мыши и таблицу пользователей, выполнивших вход. В этом примере предполагается, что в качестве ключа в потоке и первичного ключа в таблице используется идентификатор пользователя, поэтому мы можем выполнить соединение между ними без лишних ухищрений. Затем вызывается метод join потока, которому в первом параметре передается таблица.

Соединение потоков и таблиц


Теперь я хотел бы представить некоторые различия между соединениями «поток — таблица» и «поток — поток», с последним из которых вы познакомились в предыдущей главе. Соединения «поток — таблица» не являются симметричными: поток всегда должен находиться слева, на вызывающей стороне, а таблица — всегда справа.

Соединения потоков и таблиц не являются оконными. Когда на стороне потока появляется новая запись, Kafka Streams выполняет поиск ключа в таблице справа. Проверка отметок времени для обеих сторон не выполняется, если только не используются хранилища состояний с версиями, о которых мы поговорим в следующем разделе.

Чтобы захватить результат соединения, необходимо передать объект ValueJoiner, который принимает значения с обеих сторон и создает новое значение, которое может быть того же типа, что и значения с любой из сторон, или совершенно нового. Соединения «поток — таблица» могут быть внутренними (как показано в этом примере) или внешним левым.

Соединение вычисляется только для вновь поступающих записей в потоке, обновления записей в таблице просто обновляют значения ключей, но не выдают новых результатов соединения. Рассмотрим пару иллюстраций, которые помогут понять, что это значит. Схема на рис. 8.18 иллюстрирует появление новой записи в KStream.

image

Рис. 8.18. Соединение «поток — таблица» генерирует результат, только когда обновление случается на стороне потока

Когда поступит новая запись, Kafka Streams выполнит поиск ключа в KTable и применит логику ValueJoiner, чтобы получить результат соединения. Теперь рассмотрим случай, когда обновление получает KTable (рис. 8.19).

image

Рис. 8.19. При обновлении таблицы результат соединения не обновляется, обновляется только сама таблица

Как показано здесь, когда объект KTable получает новую запись, он просто обновляет соответствующий ключ, но не вызывает никаких действий по вычислению соединения. В этом примере соединения «поток — таблица» время не учитывается, так как KTable хранит относительно статичные данные о пользователях. Но в ситуациях, когда время или временная семантика имеют большое значение, этот аспект нужно учитывать.

Версионированные таблицы KTable


На рис. 8.20 показан случай реализации соединений «поток — таблица» с ограничением по времени.

image

Рис. 8.20. Обновление KTable и соединение с запоздавшей записью KStream приводит к ошибочному результату

На рис. 8.20 показан KStream с заказом или товаром, стоимость которого постоянно меняется, и таблица KTable, содержащая цены. Когда пользователь размещал заказ в момент времени T2, цена была установлена на уровне $6, а в момент времени T3 цена была увеличена до $9. Но запись в KStream, размещенная в момент времени T2, запоздала. В результате происходит соединение с заказом, созданным в момент времени T2, с ценой, обновившейся в момент времени T3, то есть клиент будет вынужден заплатить 8 * $9 = 72 вместо ожидаемых 8 * $6 = 48.

Такой результат обусловлен тем, что, когда в KTable поступает новая запись, Kafka Streams автоматически применяет обновление к таблице и любое последующее соединение с записью в KStream будет использовать текущую соответствующую запись в таблице. Нам нужно предотвратить проблему ошибочного определения стоимости таких «запоздавших» заказов, добавив в соединения поддержку семантики времени, чтобы обеспечить учет отметки времени в записи на стороне потока, и сохранив в KTable значение, соответствующее предыдущей отметке времени. На рис. 8.21 проиллюстрирована эта идея.

image

Рис. 8.21. Таблица с историческими записями, содержащими отметки времени, позволяет выполнять корректные во времени соединения с записями в KStream

Здесь показано, что, даже если заказ запоздает, соединение все равно будет использовать правильную цену из таблицы, соответствующую моменту времени T2, что даст ожидаемый и верный результат. Но как добавить поддержку учета времени обновлений записей в KTable? Для этого достаточно использовать версионированное хранилище состояний, но перед этим нужно создать версионированный экземпляр StoreSupplier, как показано в листинге 8.8.

Листинг 8.8. Создание версионированного хранилища состояния

image


Обратите внимание на второй параметр. Это объект Duration, который определяет, как долго должны оставаться доступными старые записи. Существует перегруженная версия метода, которая принимает объект Duration и определяет размеры сегментов для хранения старых записей.

Далее нужно подключить StoreSupplier к KTable, как показано в листинге 8.9.

Листинг 8.9. Подключение StoreSupplier к KTable

image

Этими двумя шагами мы подключили версионированное хранилище состояния к KTable, что позволит выполнять соединения, корректные во времени.

Посмотрите книгу «Kafka Streams в действии. Приложения и микросервисы, управляемые событиями. 2-е изд.» на нашем сайте.

» Оглавление
» Отрывок

По факту оплаты бумажной версии книги на e-mail высылается электронная книга.
Для Хаброжителей скидка 25 % по купону — Kafka Streams

Комментарии (0)