Приветствую читателей! Меня зовут Темирлан, и на протяжении последних нескольких лет я активно использовал Apache Kafka в проектах в сферах финансовых технологий FinTech. Этот опыт позволил мне не только глубоко изучить возможности и преимущества Kafka, но и столкнуться с уникальными вызовами, связанными с обработкой и управлением большими потоками данных в критически важных системах. В этой статье я хочу поделиться своими знаниями и опытом работы с Schema Registry, ключевым компонентом для управления схемами данных в Apache Kafka.
Apache Kafka является мощным инструментом для обработки и передачи потоковых данных в реальном времени, который находит широкое применение в различных индустриях для обработки огромных объемов данных с низкой задержкой. В центре этой платформы лежит способность эффективно распределять данные между множеством производителей (producers) и потребителей (consumers), при этом поддерживая высокую пропускную способность и масштабируемость. Однако, с увеличением количества и разнообразия данных, возникает необходимость в управлении структурами этих данных, что обеспечивает Schema Registry. Этот компонент является критически важным для поддержания согласованности данных в Kafka, поскольку он управляет схемами сообщений и обеспечивает совместимость между различными версиями схем, что позволяет системам бесперебойно обмениваться данными даже при изменении структуры сообщений.
Schema Registry — это централизованное хранилище для схем сообщений, используемых в Apache Kafka, обеспечивающее управление и контроль версий схем данных. Основная задача Schema Registry — обеспечить, чтобы все сообщения, отправляемые в Kafka, соответствовали определенной схеме, что предотвращает возможные ошибки данных, вызванные несоответствием или изменением структуры данных. Schema Registry поддерживает проверку схем и управление версиями, позволяя разработчикам безопасно модифицировать схему данных без риска нарушения обработки или хранения данных.
Интеграция Schema Registry с Kafka осуществляется через использование специальных сериализаторов и десериализаторов. Например, при использовании Apache Kafka для отправки сообщений, производители данных могут автоматически регистрировать схемы с помощью следующего кода на Java:
import org.apache.kafka.common.serialization.Serializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
var props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
var producer = new KafkaProducer<String, GenericRecord>(props);
Этот код настраивает производителя Kafka для использования KafkaAvroSerializer
, который взаимодействует с Schema Registry для регистрации схем и обеспечивает, что все участники системы работают с согласованными и верифицированными схемами, что критически важно для эффективности и надежности потоков данных.
Управление схемами играет ключевую роль в поддержании качества и согласованности данных в распределенных системах, таких как Apache Kafka. Без эффективного управления схемами, системы могут столкнуться с серьезными проблемами, включая schema drift, когда изменения в структуре данных не согласованы между производителями и потребителями данных. Это может привести к ошибкам в обработке данных, потере данных или даже полному сбою системы при обработке несоответствующих сообщений.
Централизованное управление схемами, предоставляемое решениями вроде Schema Registry, предлагает множество преимуществ:
Согласованность и совместимость схем: управление версиями схем и обеспечение обратной и вперёд совместимости позволяет системам эффективно адаптироваться к изменениям без риска нарушения работы приложений.
Упрощение разработки и обслуживания: разработчики могут вносить изменения в схемы без страха нарушить процессы обработки данных, что сокращает время на тестирование и внедрение новых функций.
Улучшенная надежность данных: гарантия, что все сообщения соответствуют зарегистрированным схемам, снижает вероятность ошибок данных и улучшает общую надежность системы.
Примером практического применения централизованного управления схемами может служить сценарий, когда производитель данных вносит изменение в структуру отправляемых сообщений. Благодаря Schema Registry, изменение схемы регистрируется и верифицируется заранее, а потребители данных получают уведомление о новой версии схемы. Это позволяет потребителям данных подготовиться к обработке новой структуры данных без перебоев в их сервисах. Пример кода, иллюстрирующий это:
Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-awesome-topic"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord value = record.value();
// Обработка полученных данных с учетом актуальной схемы
}
}
} finally {
consumer.close();
}
Этот код демонстрирует, как потребитель Kafka использует KafkaAvroDeserializer
для десериализации сообщений в соответствии с актуальной схемой, предоставленной через Schema Registry. Таким образом, централизованное управление схемами не только упрощает обслуживание и разработку, но и повышает общую устойчивость и надежность системы обработки данных.
Теперь о каждом пункте подробнее!
Совместимость схем: Schema Registry поддерживает как обратную, так и вперёд совместимость схем. Это означает, что система может обрабатывать как старые, так и новые форматы данных без сбоев. Пример кода на Java показывает, как можно настроить сериализатор для отправки данных, учитывая обратную совместимость:
var producer = new KafkaProducer<String, GenericRecord>(props);
// Предположим, schema - это ваша Avro схема
var avroRecord = new GenericData.Record(schema);
// Заполнение avroRecord данными
producer.send(new ProducerRecord<String, GenericRecord>("your-awesome-topic", avroRecord));
producer.close();
Интеграция с API и сериализаторы/десериализаторы Kafka: Schema Registry тесно интегрирован с Kafka через API, которое позволяет сериализаторам и десериализаторам автоматически извлекать схемы для кодирования и декодирования сообщений. Ниже пример кода для десериализатора:
var consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList("topic"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.println(record.value());
}
}
} finally {
consumer.close();
}
Версионирование схем: в основе управления схемами в Schema Registry лежит принцип версионирования. Каждый раз, когда производится изменение в схеме, оно регистрируется как новая версия в Registry. Это позволяет не только сохранять исторические версии схем, но и обеспечивать управление совместимостью между ними. Schema Registry поддерживает четыре режима совместимости: NONE
, BACKWARD
, FORWARD
, и FULL
. Например, режим гарантирует, что новые данные могут быть прочитаны старыми схемами, что критически важно для обеспечения стабильности систем при разработке и масштабировании.
var schemaString = "{\\"namespace\\": \\"example.avro\\", \\"type\\": \\"record\\", " +
"\\"name\\": \\"User\\", \\"fields\\": [{\\"name\\": \\"name\\", \\"type\\": \\"string\\"}]}";
var parser = new Schema.Parser();
var avroSchema = parser.parse(schemaString);
var client = new CachedSchemaRegistryClient("<http://localhost:8081>", 10);
int registeredSchemaId = client.register("user-value", new AvroSchema(avroSchema));
// Установка режима совместимости
client.updateCompatibility("user-value", "BACKWARD");
System.out.println("Schema registered with ID: " + registeredSchemaId);
В этом примере мы создаем новую схему для записей пользователя, регистрируем ее в Schema Registry и устанавливаем режим совместимости на BACKWARD
. Это обеспечивает, что все последующие версии схемы будут совместимы с предыдущими версиями, что позволяет старым приложениям корректно обрабатывать данные, произведенные новыми версиями приложений.
В данной статье мы рассмотрели ключевую роль Schema Registry в экосистеме Apache Kafka, начиная с обеспечения совместимости и версионирования схем до интеграции с API и сериализаторами/десериализаторами. Schema Registry служит важным инструментом для управления схемами сообщений, что необходимо для поддержания целостности и надежности данных в системах реального времени. Он предоставляет механизмы для обеспечения обратной и вперёд совместимости схем, что позволяет системам легко адаптироваться к изменениям без риска сбоев. Централизованное управление схемами через Schema Registry упрощает разработку и обслуживание распределенных систем, повышая их устойчивость и эффективность.
Комментарии (14)
kimi44
29.04.2024 10:10А как быстро стартануть то? Какой-то есть материал, который вам сильно помог? В каком виде описываются схемы? Как их закидывать? UI или по API? Какой-нибудь толковый 101 очень бы не помешал, но возможно это идея для следующей статьи ))
kimi44
29.04.2024 10:10Еще бы понять конкретно что avro решает. С моей точки зрения есть топики и в них можно писать произвольный JSON. Этот JSON можно валидировать, но почему тогда все время речь о сериализации и десериализации в статьях про avro. Он же уже лежит в json формате в топике =) Но возможно мне просто стоит подробнее почитать про эту технологию
bibmaster
29.04.2024 10:10Если у вас JSON, вам скорее всего не нужен schema registry. Это вообще штука заточенная чисто под avro. Дело в том что это формат сериализации, в котором отсутствуют метаданные. Там нет названий полей как в json или тегов как в protobuf. Это просто бинарно закодированные подряд все поля структуры. И не зная с какой схемой было закодировано сообщение, декодировать его невозможно. Поэтому avro без schema registry существовать практически не может. Выбирая avro надо всегда иметь в виду что все producers и consumers становятся зависимыми от schema registry. И сам он становится критичной частью всей инфраструктуры.
kimi44
29.04.2024 10:10У меня действительно во все топики складывается JSON, но есть потребность проверять, что в определенные топики залетают только JSON определенного вида(содержащие определенные ключи, иногда надо проверить и value). Если есть какой-то механизм, который позволил бы проверять данные и писать ошибки в логи, если что-то не удовлетворяет условиям это было бы идеально. Еще идеальнее, если бы это все можно было хранить прямо по месту в kafka топиках и желательно без подключения внешнего хранилища спек. Но я не знаю существует ли для кафка что-то похожее на мое описание, но если вы знаете такое, то напишите, пожалуйста.
temirlan100 Автор
29.04.2024 10:10В каком виде описываются схемы?
{ "type" : "record", "namespace" : "someSchema", "name" : "Employee", "fields" : [ { "name" : "Name" , "type" : "string" }, { "name" : "Age" , "type" : "int" } ] }
А как быстро стартануть то? Какой-то есть материал, который вам сильно помог?
Сейчас и не вспомню к сожалению) все через документацию
Как их закидывать? UI или по API?
API =) UI тулзы не смотрел
ptr128
29.04.2024 10:10Сделать реестр схем первичным источником мне не удалось, так как из схемы автоматически вычищаются комментарии. А если одна и та же схема используется не только в Kafka, то это уже проблема. Например, эти комментарии нужны Swagger.
SAPetrovich77
29.04.2024 10:10Странная статья. В самом начале приведен пример настройки поставщика, в котором почему-то задаются свойства получателя и десереализаторв используемых получателем. Дальше те-же непонятки... Ну и тема как реально реализуются декларируемые прелести использования avro как мне кажется совсем не раскрыта...
kimi44
Добрый день, очень интересно. Можете рассказать насколько внедрение схем влияет на пропускную способность или насколько оно увеличивает накладные расходы на проверку? Также интересно схемы складываются прямо в кафку в отдельный топик или сервис со схемами является отдельным хранилищем в которое кафка по апи ходит или еще как-то? Если данные летят в старом deprecated формате, то подразумевается, что мы как-то записываем логи или метрики об этом? Чтобы понять уменьшается ли у нас объем таких сообщений и можем ли мы убивать обратную совместимость или как мониторинг осуществляется на соответствие схемам?
temirlan100 Автор
Мы наблюдали, но просадки в performance не заметили
именно так, но мысль про отдельный топик очень интересна!
Логирование происходит в сервисе и в самих логах Kafka
Не уверен что до конца понял Ваш вопрос, есть топики с обязательным требованиям к обратной совместимости и если там что то Deprecated, происходил переезд в новый топик
kimi44
Я вероятно не понял что есть режим совместимости. Есть v1 и v2 какой-то схемы. Совместимость обеспечивается тем, что kafka преобразует v1 в v2 каким-то внутренним механизмом, если ей на вход прийдет v1? А если в v1 данных в принципе для такого преобразования не хватает.
Поэтому я решил, что вряд ли это имеется ввиду. Наверное просто у нас две спеки v1 и v2 обе считаются валидными, а уметь работать с обеими версиями это уже вопрос консьюмеров. Тогда предполагаю, что v1 можно поставить deprecated и потихоньку его вымывать по мере того, как продьюсеры v1 перестают генерить и вместо него генерят v2. Тогда для v1 должна собираться метрика, которая показывает какой объем до сих пор данных в deprecated v1 схеме продолжает приходить. Чтобы отловить момент, когда их начнет приходить 0 и убрать вообще поддержку схемы v1. Какая-то такая мысль была, но не знаю поддерживает ли сам процесс верификации на схему отправку метрик. Наверное нет и это опять же вопрос консьюмеров.
temirlan100 Автор
Просто алерты которые через логи можно как то аггрегировать и анализировать в дальнейшем, но чтобы слету как-то преобразовать, такого не знаю и не встречал в своей практики =)
kimi44
Я начал еще вокруг этой темы смотреть чего изучить и попал на курс Akka Classic Serialization. Там ребята нагнетают, что у Java сериализации какой-то ужасающий перфоманс, но вот вы говорите, что даже не заметили изменений, а плюсы проверки схемы уже получили. Но непонятно насколько в их утверждении есть рациональное зерно и при какой нагрузке это стрелять начинает. Но опять же, там Akka и Scala, непонятно насколько оно надо вообще.
bibmaster
Я немножко не из мира java, но, насколько я представляю, дело обстоит примерно так: клиент имеет свою схему, через registry (регистрируя) он может получить её id. Если id схемы в получаемом сообщении совпадает с клиентской, всё просто. Если нет, клиент запрашивает схему сообщения из registry и если она совместима должен построить конвертер, который декодирует сообщение по старой (или может быть новой) схеме и по правилам эволюции схемы преобразует его в объект текущей схемы.
bibmaster
Абсолютно никак не влияет. Брокеру без разницы что в него кладут, он ничего не проверяет. Schema registry это то, с чем работают клиенты через отдельный api. Оно может быть вообще в другой кафке если что, кафка для этого сервиса просто хранилище. Режимы совместимости в avro просто накладывают ограничения на возможности модификации схем, так чтобы клиент имея локально схему версии X мог декодировать сообщение закодированное со схемой версии Y. Соответственно schema registry при обновлении схемы может такие ограничения проверять и не даст изменить схему если совместимость нарушается.