Не за горами выход релиза 4.0 Apache Kafka. Согласно графику релиза, 15 января состоится code freeze, а через пару недель или позже, после стабилизации, версия 4.0 увидит свет. Самое время присмотреться, что же в неё вошло.
Развитие Apache Kafka происходит в рамках процесса процесса работы с KIP'ами, Kafka Improvement Proposals. В релиз 4.0 их вошло 37 штук. Из них наиболее интересной мне показалась доработка, связанная с введением новой концепции, которую разработчики смело сравнили с очередями. Давайте посмотрим, что у них получилось. Остальные доработки, наверное, важны не менее.
Очереди!
"Очередям" в релизе посвящено 2 KIP'а:
Queues for Kafka |
Очереди в kafka |
|
Разработчики наконец признали важность полноценной реализации в Kafka сценария коллективной и конкурентной обработки потребителями событий, то есть без необходимости эксклюзивного назначения доступа к партиции топика. Для реализации данной функциональности, в дополнение к хорошо известным группам потребителей добавляются разделяемые группы (share group) — число потребителей в таких группах может превышать число партиций. Защита доступа к записи обеспечивается механизмом блокировки (по умолчанию на 30с, может быть переопределено свойством Для доступа к топикам посредством разделяемых групп вводится новый интерфейс | ||
Administration of groups |
Администрирование групп |
|
В продолжение предыдущего KIP'а, в связи с расширением номенклатуры типов групп, которое продолжится и далее (см. KIP-1071), был добавлен новый способ получения информации о группах посредством утилиты |
Как обычно, новая функциональность заявлена как экспериментальная, изменяющаяся и не рекомендуется к применению в продуктиве. Использование нового класса, в целом, аналогично старому.
Поддерживаются различные сценарии подтверждения, неявный:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
consumer.subscribe(Arrays. asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration. ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
doProcessing(record);
}
}
Альтернативно, можно использовать consumer.commitSync()
или consumer.commitAsync()
для передачи подтверждений, но это менее эффективно, поскольку приводит к дополнительным вызовам. Возможно также подтверждение на уровне отдельных записей:
Properties props = new Properties();
props.setProperty("bootstrap. servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
consumer.subscribe(Arrays. asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
doProcessing(record);
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (Exception e) {
consumer.acknowledge(record, AcknowledgeType.REJECT);
break;
}
}
consumer.commitSync();
}
Транзакционное поведение групп определяется свойством group.share.isolation.level
. Оно применяется ко всей группе, а не к отдельному потребителю. Свойство может принимать значение read_committed
или read_uncommitted
(по-умолчанию).
В рамках данного KIP также добавлена новая утилита командной строки kafka-console-share-consumer.sh
. Помимо вышеперечисленных, добавлено множество новых свойств, изменено несколько старых, появились новые API. К сожалению, новой -perf.sh утилиты пока нет.
В общем, это новая, обширная и интересная тема, которая заслуживает подробного разбора.
Что ещё интересного?
Вот ещё несколько улучшений релиза:
allow custom processor wrapping |
Возможность пользовательских расширений потоковых процессоров |
|
Добавлена возможность, наподобие аспектов, добавлять обёртку вокруг процессоров в Streams. Например, для добавления логирования или отладки. В поставке готовых содержательных врапперов нет, но идея интересная. Для включения опции надо задать конфигурацию | ||
Allow Foreign Key Extraction from Both Key and Value in KTable Joins |
Возможность join'ов на основе ключа и значения |
|
Утверждается, что таким образом в некоторых потоковых сценариях можно предотвратить оверхед. |
Add duration based offset reset option for consumer clients |
Добавление возможность указания сдвига на основе смещения по времени |
|
В релизе 3.6 была введена возможность долговременного (многоуровневого) хранения данных (KIP-405, см. подробно тут). С учётом, что теперь данные в кластере могут храниться годы, прежних возможностей по указанию смещения ( | ||
Make remote log manager thread-pool configs dynamic |
Динамическая настройка RemoteLogManager |
|
Если вы уже пользуетесь долговременным хранением, то вас заинтересует возможность налету менять настройки пула потоков RemoteLogManager, системного механизма, находящегося под капотом данной фичи. |
Allow disabling heartbeats replication in MirrorSourceConnector |
Возможность отключения репликации heartbeat-топиков в MirrorSourceConnector |
|
Данное улучшение закрывает потенциально возможную проблему, когда при настройке репликации MirrorMaker 2.0 вы создаёте несколько коннекторов (например, для топиков различными настройками сжатия). Дело в том, что ранее в таком случае каждый из коннекторов безусловно добавлял себе в обработку служебные heartbeat-топики. Теперь их можно явно исключить из репликации. | ||
Allow the replication of user internal topics |
Возможность репликации internal-топиков |
|
До релиза 4.0 топики, название которых заканчивалось на "internal" исключались из репликации. Как оказалось, так могут называться и пользовательские топики с данными, поэтому условие исключение было дополнено, и теперь нереплицируемые топики должны дополнительно начинаться на "mm2". | ||
KIP-724 удалил совместимость с клиентами и брокерами версии 2.1 и ниже. Обновляйтесь!
Также в релизе в модулях broker
и tools
удалена поддержка Java 11.
Важными с точки зрения совместимости кода являются обновления библиотек: