Всем привет. Уже в декабре в OTUS стартует новый поток курса Software Architect. В преддверии старта курса хочу поделиться с вами переводом интересной статьи. А также предлагаю посмотреть запись демо-урока по теме: "Идемпотентность и коммутативность API в очередях и HTTP".
Один из сложных вопросов, с которыми мы постоянно сталкиваемся при проектировании приложений и систем в целом, заключается в том, как эффективно организовать обмен информацией между компонентами, сохраняя при этом достаточную гибкость для изменения интерфейсов без чрезмерного воздействия на другие части системы. Чем более конкретен и оптимизирован интерфейс, тем больше вероятность того, что он будет настолько ситуативным, что для его изменения потребуется его полностью переписывать. И наоборот; универсальные шаблоны интеграции могут быть достаточно адаптивными и широко поддерживаемыми, но, увы, за счет производительности.
События (Events) предлагают подход в стиле принципа Златовласки, в котором API реального времени (real-time APIs) могут использоваться в качестве основы для приложений, которые являются гибкими, но в то же время высокопроизводительными; слабосвязанными, но эффективными.
События можно рассматривать как строительные блоки для множества других структур данных. Как правило, они фиксируют факт того, что что-то произошло, и момент времени, в который это произошло. Событие может фиксировать эту информацию с различными уровнями детализации: от простого уведомления до подробного события, описывающего полное состояние того, что произошло.
Из событий мы можем агрегировать состояния (state) - те самые состояния, которое мы хорошо знаем и любим в СУБД и NoSQL хранилищах. Помимо того, что события служат базой для состояний, они также могут использоваться для асинхронного запуска действий в другой части системы - это основа для событийно-ориентированных архитектур. Таким образом, мы можем создавать потребителей (consumers) в соответствии с нашими требованиями - как без, так и с сохранением состояния в целях отказоустойчивости. Производители (producers) могут хранить состояние, но это не обязательно, поскольку потребители могут восстанавливать его самостоятельно из полученных событий.
Если вы задумаетесь о сфере бизнеса, в которой вы работаете, вам, вероятно, может прийти на ум множество примеров событий. Они могут быть инициированы человеком, а могут быть сгенерированы машиной. Они могут содержать обширную полезную нагрузку (payload) или быть, по сути, всего лишь уведомлением. Например:
Событие:
userLogin
Полезная нагрузка:
zbeeblebrox
вошел в систему в2020-08-17 16:26:39 BST
Событие:
CarParked
Полезная нагрузка: Регистрация парковки автомобиля
A42 XYZ
в2020-08-17 16:36:27
в парковочном местеX42
Событие:
orderPlaced
Полезная нагрузка:
Робин
заказалчетыре банки консервированной фасоли
на общую сумму?2.25
в2020-08-17 16:35:41 BST
Эти события можно использовать для непосредственного запуска действий в разных частях вашей системы (например, службы, которая обрабатывает заказы в ответ на их размещение), а также их можно использовать в для предоставления совокупной информации (например, о текущем количестве занятых мест на автостоянке и, следовательно, о том, какие парковочные места доступны).
Итак, если события являются краеугольным камнем, на котором мы собираемся создавать наши приложения и сервисы, нам нужна технология, которая наилучшим образом поддержит нас в наших начинаниях - и именно здесь на помощь приходит Apache Kafka®. Kafka - это масштабируемая платформа потоковой передача событий, которая предоставляет:
Pub/Sub
Для публикации (записи) и подписки (чтения) потоков событий, включая непрерывный импорт/экспорт ваших данных из других систем.
Обработку потоков с отслеживанием состояния
Для длительного и надежного хранения потоков событий столько, сколько вам нужно.
Хранилище
Для обработки потоков событий по мере их поступления или ретроспективно.
Kafka спроектирован на концепции логирования. Используя эту простую, но мощную концепцию распределенного неизменяемого лога, разрешающего только добавления, мы можем фиксировать и сохранять события, происходящие в нашей бизнес системе, эффективным и масштабируемым способом. Эти события могут быть доступны множеству пользователей по подписке, а также могут обрабатываться и в дальнейшем агрегироваться либо для прямого использования, либо для хранения в других системах, таких как СУБД, озера данных и NoSQL хранилища.
В оставшейся части этой статьи я исследую API-интерфейсы, доступные в Apache Kafka, и продемонстрирую, как их можно использовать в системах и приложениях, которые вы создаете.
API производителя и потребителя
Одно из преимуществ такой системы, как Kafka, заключается в том, что производители и потребители разделены, что, среди прочего, означает, что мы можем производить данные без потребителя (а также из-за такого разделения мы можем масштабировать этот процесс). Когда происходит событие, мы отправляем его Kafka - вот и все. Все, что нам нужно знать, - это детали кластера Kafka и топик (topic или тема - способ организации данных в Kafka, что-то вроде таблиц в СУБД), в которую мы хотим отправить событие.
Для Kafka доступны клиенты на многих языках. Вот пример создания события для Kafka на Go:
package main
import (
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
topic := "test_topic"
p, _ := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092"})
defer p.Close()
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic,
Partition: 0},
Value: []byte("Hello world")}, nil)
}
Поскольку Kafka надежно хранит события, это означает, что они будут доступны именно тогда, когда мы захотим потребить их, ровно до тех пор, пока мы не сочтем их устаревшими (что можно настроить отдельно для каждого топика).
Событие, записанное в топике Kafka, доступно для чтения одному или нескольким потребителям. Потребители могут вести себя самым обычным образом в режиме pub/sub, получая новые события по мере их поступления, а также повторно потреблять произвольные события события, если это нужно приложению. Эта фича Kafka с повторным потреблением, благодаря его надежному и масштабируемому уровню хранения, является огромным преимуществом для многих важных сценариев использования на практике, таких как машинное обучение и A/B-тестирование, где необходимы как оперативные, так и исторические данные. Это также обязательное требование в регулируемых отраслях, где данные должны храниться в течение многих лет, чтобы соответствовать требованиям законодательства. Традиционные системы обмена сообщениями, такие как RabbitMQ, ActiveMQ, не могут удовлетворить такие запросы.
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
topic := "test_topic"
cm := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"go.events.channel.enable": true,
"group.id": "rmoff_01"}
c, _ := kafka.NewConsumer(&cm)
defer c.Close()
c.Subscribe(topic, nil)
for {
select {
case ev := <-c.Events():
switch ev.(type) {
case *kafka.Message:
km := ev.(*kafka.Message)
fmt.Printf("? Message '%v' received from topic '%v'\n", string(km.Value), string(*km.TopicPartition.Topic))
}
}
}
}
Когда потребитель подключается к Kafka, он предоставляет идентификатор группы потребителей (Consumer Group). Концепция группы потребителей дает нам две следующие функциональные возможности. Во-первых, Kafka отслеживает точку в топике, до которой потребитель дочитал события, чтобы при повторном подключении он мог продолжить чтение с той точки, на которой он остановился. Во-вторых, приложение-потребитель может захотеть масштабировать чтение на несколько своих инстансов, образуя группу потребителей, которая позволяет обрабатывать ваши данные параллельно. В этом случае Kafka будет распределять события для каждого потребителя в группе на основе доступных разделов топика и будет активно управлять группой, нужно ли членам последовательно отключиться или подключиться (например, в случае сбоя одного из инстансов потребителя).
Это означает, что несколько сервисов могут использовать одни и те же данные без какой-либо взаимозависимости между ними. Те же данные также могут быть перенаправлены в хранилища в другом месте с помощью Kafka Connect API, который мы обсудим ниже.
Producer и Consumer API доступны в вариантах на Java, C/C++, Go, Python, Node.js и многих других. Но что, если ваше приложение хочет использовать HTTP вместо нативного протокола Kafka? На этот случай есть REST Proxy.
Использование REST API с Apache Kafka
Допустим, мы пишем приложение для устройства на интеллектуальной автостоянке. Полезная нагрузка для события, регистрирующего факт того, что автомобиль только что заняла место, может выглядеть следующим образом:
{
"name": "NCP Sheffield",
"space": "A42",
"occupied": true
}
Мы могли бы поместить это событие в топик Kafka, который также будет записывать время как часть метаданных события. Создание данных в Kafka с помощью Confluent REST Proxy - это простой REST-запрос:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}' "http://localhost:8082/topics/carpark"
Любое приложение может потреблять данные из этого топика, используя собственный Consumer API, который мы видели выше, или с помощью REST-вызова. Как и в случае с собственным Consumer API, потребители, использующие REST API, также являются членами Consumer Group, которая называется подпиской (subscription). Таким образом, с помощью REST API вы должны в первую очередь объявить своего потребителя, и подписку:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}' http://localhost:8082/consumers/rmoff_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}' http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription
После этого вы можете читать события:
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records
[
{
"topic": "carpark",
"key": null,
"value": {
"name": "Sheffield NCP",
"space": "A42",
"occupied": true
},
"partition": 0,
"offset": 0
}
]
Если вам нужно получить несколько событий, вы получите их в виде батча в ответе на запрос. Если ваш клиент хочет проверить наличие новых событий, ему нужно будет снова отправить REST-запрос.
Мы рассмотрели, как мы можем загружать и получать данные из топиков Kafka. Но в большинстве случаев нам нужно больше, чем просто pub/sub. Мы хотим взять поток событий и с его помощью увидеть большую картину - все автомобили, которые приходят и уходят, сколько парковочных мест свободно прямо сейчас? Или, может быть, мы хотели бы иметь возможность подписаться на поток обновлений только для одной конкретной автостоянки.
Условные уведомления, потоковая обработка и материализованные представления
Думать об Apache Kafka только как о реализации pub/sub - это как думать об iPhone как о устройстве исключительно для совершения телефонных звонков. Я имею в виду, что не неправильно называть это одной из его возможностей… но он способен делать гораздо больше. Apache Kafka включает возможности потоковой обработки через Kafka Streams API. Это многофункциональная клиентская библиотека на Java для выполнения потоковой обработки данных в Kafka с сохранением состояния в масштабе и на нескольких машинах. Kafka Streams, широко используемый в таких компаниях, как Walmart, Ticketmaster и Bloomberg, также является основой для ksqlDB.
ksqlDB - это база данных потоковой передачи событий, специально созданная для приложений потоковой обработки. Она предоставляет API на основе SQL для запроса и обработки данных в Kafka. В числе множества функций ksqlDB можно отметить фильтрацию, преобразование и объединение данных из потоков и таблиц в реальном времени, создание материализованных представлений путем агрегирования событий и многое-многое другое.
Для работы с данными в ksqlDB нам сначала нужно объявить схему:
CREATE STREAM CARPARK_EVENTS (NAME VARCHAR,
SPACE VARCHAR,
OCCUPIED BOOLEAN)
WITH (KAFKA_TOPIC='carpark',
VALUE_FORMAT='JSON');
ksqlDB развертывается как кластерное приложение, и приведенная выше инициализация может выполняться при запуске или непосредственно клиентом, если это потребуется. После этого любой клиент может подписаться на поток изменений из исходного топика, но с примененным фильтром. Например, чтобы получить уведомление, когда освобождается место на определенной автостоянке, можно запустить:
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS,
SPACE
FROM CARPARK_EVENTS
WHERE NAME='Sheffield NCP'
AND OCCUPIED=false
EMIT CHANGES;
В отличие от SQL-запросов, к которым вы, возможно, уже привыкли, этот запрос является непрерывным (обозначается предложением EMIT CHANGES). Непрерывные запросы, известные как push-запросы, будут продолжать возвращать любые новые совпадения с предикатом по мере возникновения событий, сейчас и в будущем, пока они не будут завершены. ksqlDB также поддерживает pull-запросы (которые мы рассмотрим ниже), и они ведут себя так же, как запросы к обычной СУБД, возвращая результаты поиска в определенный момент времени. Таким образом, ksqlDB поддерживает как потоковую передачу, так и статическое состояние, что также потребуется на практике большинству приложений в зависимости от выполняемых задач.
ksqlDB включает в себя увесистый REST API, вызов которого для SQL приведенного выше будет выглядеть следующим образом:
curl --http2 'http://localhost:8088/query-stream' --data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'\''yyyy-MM-dd HH:mm:ss'\'') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='\''Sheffield NCP'\'' and OCCUPIED=false EMIT CHANGES;"}'
Этот вызов результирует в потоковом ответе с заголовком от сервера, а затем при получении любых подходящих событий из исходного топика они будут отправляться клиенту:
{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}
…
["2020-08-05 16:02:33","A42"]
…
…
…
["2020-08-05 16:07:31","D72"]
…
Мы также можем использовать ksqlDB для определения и заполнения новых потоков данных. Предваряя SELECT
выражением CREATE STREAM streamname AS
мы можем направить вывод непрерывного запроса в топик Kafka. Таким образом, мы можем использовать ksqlDB для выполнения преобразований, объединений, фильтрации и т.д. над событиями, которые мы отправляем в Kafka. ksqlDB поддерживает концепцию таблицы как типа объекта первого класса, и мы могли бы использовать это, чтобы дополнить события автостоянки информацией о самом автостоянке:
CREATE STREAM CARPARKS AS
SELECT E.NAME AS NAME, E.SPACE,
R.LOCATION, R.CAPACITY,
E.OCCUPIED,
CASE
WHEN OCCUPIED=TRUE THEN 1
ELSE -1
END AS OCCUPIED_IND
FROM CARPARK_EVENTS E
INNER JOIN
CARPARK_REFERENCE R
ON E.NAME = R.NAME;
Вы можете заметить, что мы также использовали оператор CASE для применения логики к данным, позволяющей нам реализовать текущее количество доступных парковочных мест. Вышеупомянутый CREATE STREAM заполняет топик Kafka, который выглядит следующим образом:
+----------------+-------+----------+----------------------------+----------+--------------+
|NAME |SPACE |OCCUPIED |LOCATION |CAPACITY |OCCUPIED_IND |
+----------------+-------+----------+----------------------------+----------+--------------+
|Sheffield NCP |E48 |true |{LAT=53.4265964, LON=-1.8426|1000 |1 |
| | | |386} | | |
Наконец, давайте посмотрим, как мы можем реализовать агрегирование с отслеживанием состояния в ksqlDB и запросить его у клиента. Чтобы создать материализованное представление, вы запускаете SQL, который включает агрегатные функции:
CREATE TABLE CARPARK_SPACES AS
SELECT NAME,
SUM(OCCUPIED_IND) AS OCCUPIED_SPACES
FROM CARPARKS
GROUP BY NAME;
Это состояние поддерживается распределенными узлами ksqlDB и может быть запрошено напрямую с помощью REST API:
curl --http2 'http://localhost:8088/query-stream' --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
В отличие от потокового ответа, который мы видели выше, запросы к состоянию (известные как "pull-запросы ", а не" push-запросы ") возвращаются немедленно, а сразу же завершаются:
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[30]
Если приложению нужно получить самое актуальное значение, оно может повторно отправить запрос - значение может измениться или же остаться таким же
curl --http2 'http://localhost:8088/query-stream' --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[29]
Существует также клиент Java для ksqlDB и созданные сообществом клиенты Python и Go.
Интеграция с другими системами
Одно из преимуществ использования Apache Kafka в качестве масштабируемого и сохраняющего состояние брокера для асинхронного обмена сообщениями состоит в том, что те же данные, которыми вы обмениваетесь между своими приложениями, также могут быть задействованы в потоковой обработке (как мы видели выше), и напрямую поставляться зависимым системам.
Продолжая пример приложения, которое отправляет событие каждый раз, когда машина припарковывается или покидает место, вполне вероятно, что мы захотим использовать эту информацию в других местах, например:
в аналитике для анализа поведения во время парковки и трендов
в машинном обучении для прогнозирования спроса на парковочные места
в потоках данных для сторонних подрядчиков
Используя Apache Kafka Connect API, вы можете определять потоковую интеграцию с системами как внутри, так и вне Kafka. Например, для потоковой передачи данных из Kafka в S3 в реальном времени вы можете запустить:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config -d ' {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "carpark",
"s3.bucket.name": "rmoff-carparks",
"s3.region": "us-west-2",
"flush.size": "1024",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat"
}'
Теперь те же данные, которые управляют уведомлениями в вашем приложении и участвуют в формировании состояния, которое ваше приложение может запрашивать напрямую, также передаются в S3. Каждый сценарий отделен от другого. Если мы впоследствии захотим передать те же данные в другое место, например в Snowflake, мы просто добавим еще одну конфигурацию Kafka Connect; другие потребители не будут затронуты. Kafka Connect также может передавать данные в Kafka. Например, таблица CARPARK_REFERENCE
, которую мы используем в примере с ksqlDB выше, может транслироваться с помощью системы отслеживания измененных сданных (CDC - change data capture) из базы данных, которая ведет себя как система записи для этих данных.
Заключение
Apache Kafka предлагает масштабируемую платформу потоковой передачи событий, с помощью которой вы можете создавать приложения на основе этой мощной концепции. Используя события в качестве основы для соединения ваших приложений и сервисов, вы получаете множество преимуществ, включая слабую связанность, автономность сервисов, эластичность, гибкую эволюционируемость и устойчивость.
Вы можете использовать API Kafka и окружающей его экосистемы, включая ksqlDB, как для потребления на основе подписки, так и для поиска по ключу/значению в материализованных представлениях без необходимости в дополнительных хранилищах данных. API доступны как в виде собственных клиентов, так и через REST.
Чтобы узнать больше об Apache Kafka, посетите developer.confluent.io. Confluent Platform - это дистрибутив Apache Kafka, который включает все компоненты, обсуждаемые в этой статье. Он доступен как локально, так и в виде управляемого сервиса под названием Confluent Cloud. На GitHub вы можете найти Docker Compose и примеры кода для этой статьи, чтобы потестировать все это самостоятельно. Если вы хотите узнать больше о построении событийно-ориентированных систем, на основе Kafka, обязательно прочтите превосходную книгу Бена Стопфорда «Проектирование событийно-ориентированных систем».
Посмотреть запись демо-урока по теме: "Идемпотентность и коммутативность API в очередях и HTTP".
devopg
самое не понятное
если допустим 10 машин на каждой по ksqldb:
если создать допустим таблицу, то на какой машине окажутся они? Автоматически синхронизируются по всем 10 экземплярам? Какая машина разгребает очередь в ktable?
тоесть все вопросы направлены на не ясность горизонтального масштабирования ksqldb.