Приветствую читателей! Меня зовут Темирлан, и на протяжении последних нескольких лет я активно использовал Apache Kafka в проектах в сферах финансовых технологий FinTech. Этот опыт позволил мне не только глубоко изучить возможности и преимущества Kafka, но и столкнуться с уникальными вызовами, связанными с обработкой и управлением большими потоками данных в критически важных системах. В этой статье я хочу поделиться своими знаниями и опытом работы с Schema Registry, ключевым компонентом для управления схемами данных в Apache Kafka.

Apache Kafka является мощным инструментом для обработки и передачи потоковых данных в реальном времени, который находит широкое применение в различных индустриях для обработки огромных объемов данных с низкой задержкой. В центре этой платформы лежит способность эффективно распределять данные между множеством производителей (producers) и потребителей (consumers), при этом поддерживая высокую пропускную способность и масштабируемость. Однако, с увеличением количества и разнообразия данных, возникает необходимость в управлении структурами этих данных, что обеспечивает Schema Registry. Этот компонент является критически важным для поддержания согласованности данных в Kafka, поскольку он управляет схемами сообщений и обеспечивает совместимость между различными версиями схем, что позволяет системам бесперебойно обмениваться данными даже при изменении структуры сообщений.

Schema Registry — это централизованное хранилище для схем сообщений, используемых в Apache Kafka, обеспечивающее управление и контроль версий схем данных. Основная задача Schema Registry — обеспечить, чтобы все сообщения, отправляемые в Kafka, соответствовали определенной схеме, что предотвращает возможные ошибки данных, вызванные несоответствием или изменением структуры данных. Schema Registry поддерживает проверку схем и управление версиями, позволяя разработчикам безопасно модифицировать схему данных без риска нарушения обработки или хранения данных.

Интеграция Schema Registry с Kafka осуществляется через использование специальных сериализаторов и десериализаторов. Например, при использовании Apache Kafka для отправки сообщений, производители данных могут автоматически регистрировать схемы с помощью следующего кода на Java:

import org.apache.kafka.common.serialization.Serializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;

var props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");

var producer = new KafkaProducer<String, GenericRecord>(props);

Этот код настраивает производителя Kafka для использования KafkaAvroSerializer, который взаимодействует с Schema Registry для регистрации схем и обеспечивает, что все участники системы работают с согласованными и верифицированными схемами, что критически важно для эффективности и надежности потоков данных.

Управление схемами играет ключевую роль в поддержании качества и согласованности данных в распределенных системах, таких как Apache Kafka. Без эффективного управления схемами, системы могут столкнуться с серьезными проблемами, включая schema drift, когда изменения в структуре данных не согласованы между производителями и потребителями данных. Это может привести к ошибкам в обработке данных, потере данных или даже полному сбою системы при обработке несоответствующих сообщений.

Централизованное управление схемами, предоставляемое решениями вроде Schema Registry, предлагает множество преимуществ:

  1. Согласованность и совместимость схем: управление версиями схем и обеспечение обратной и вперёд совместимости позволяет системам эффективно адаптироваться к изменениям без риска нарушения работы приложений.

  2. Упрощение разработки и обслуживания: разработчики могут вносить изменения в схемы без страха нарушить процессы обработки данных, что сокращает время на тестирование и внедрение новых функций.

  3. Улучшенная надежность данных: гарантия, что все сообщения соответствуют зарегистрированным схемам, снижает вероятность ошибок данных и улучшает общую надежность системы.

Примером практического применения централизованного управления схемами может служить сценарий, когда производитель данных вносит изменение в структуру отправляемых сообщений. Благодаря Schema Registry, изменение схемы регистрируется и верифицируется заранее, а потребители данных получают уведомление о новой версии схемы. Это позволяет потребителям данных подготовиться к обработке новой структуры данных без перебоев в их сервисах. Пример кода, иллюстрирующий это:

Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-awesome-topic"));
try {
    while (true) {
        ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, GenericRecord> record : records) {
            GenericRecord value = record.value();
            // Обработка полученных данных с учетом актуальной схемы
        }
    }
} finally {
    consumer.close();
}

Этот код демонстрирует, как потребитель Kafka использует KafkaAvroDeserializer для десериализации сообщений в соответствии с актуальной схемой, предоставленной через Schema Registry. Таким образом, централизованное управление схемами не только упрощает обслуживание и разработку, но и повышает общую устойчивость и надежность системы обработки данных.

Теперь о каждом пункте подробнее!

Совместимость схем: Schema Registry поддерживает как обратную, так и вперёд совместимость схем. Это означает, что система может обрабатывать как старые, так и новые форматы данных без сбоев. Пример кода на Java показывает, как можно настроить сериализатор для отправки данных, учитывая обратную совместимость:

var producer = new KafkaProducer<String, GenericRecord>(props);
// Предположим, schema - это ваша Avro схема
var avroRecord = new GenericData.Record(schema);
// Заполнение avroRecord данными
producer.send(new ProducerRecord<String, GenericRecord>("your-awesome-topic", avroRecord));
producer.close();

Интеграция с API и сериализаторы/десериализаторы Kafka: Schema Registry тесно интегрирован с Kafka через API, которое позволяет сериализаторам и десериализаторам автоматически извлекать схемы для кодирования и декодирования сообщений. Ниже пример кода для десериализатора:

var consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList("topic"));
try {
    while (true) {
        ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, GenericRecord> record : records) {
            System.out.println(record.value());
        }
    }
} finally {
    consumer.close();
}

Версионирование схем: в основе управления схемами в Schema Registry лежит принцип версионирования. Каждый раз, когда производится изменение в схеме, оно регистрируется как новая версия в Registry. Это позволяет не только сохранять исторические версии схем, но и обеспечивать управление совместимостью между ними. Schema Registry поддерживает четыре режима совместимости: NONE, BACKWARD, FORWARD, и FULL. Например, режим гарантирует, что новые данные могут быть прочитаны старыми схемами, что критически важно для обеспечения стабильности систем при разработке и масштабировании.


var schemaString = "{\\"namespace\\": \\"example.avro\\", \\"type\\": \\"record\\", " +
                       "\\"name\\": \\"User\\", \\"fields\\": [{\\"name\\": \\"name\\", \\"type\\": \\"string\\"}]}";
var parser = new Schema.Parser();
var avroSchema = parser.parse(schemaString);
var client = new CachedSchemaRegistryClient("<http://localhost:8081>", 10);
int registeredSchemaId = client.register("user-value", new AvroSchema(avroSchema));
// Установка режима совместимости
client.updateCompatibility("user-value", "BACKWARD");
System.out.println("Schema registered with ID: " + registeredSchemaId);

В этом примере мы создаем новую схему для записей пользователя, регистрируем ее в Schema Registry и устанавливаем режим совместимости на BACKWARD . Это обеспечивает, что все последующие версии схемы будут совместимы с предыдущими версиями, что позволяет старым приложениям корректно обрабатывать данные, произведенные новыми версиями приложений.

В данной статье мы рассмотрели ключевую роль Schema Registry в экосистеме Apache Kafka, начиная с обеспечения совместимости и версионирования схем до интеграции с API и сериализаторами/десериализаторами. Schema Registry служит важным инструментом для управления схемами сообщений, что необходимо для поддержания целостности и надежности данных в системах реального времени. Он предоставляет механизмы для обеспечения обратной и вперёд совместимости схем, что позволяет системам легко адаптироваться к изменениям без риска сбоев. Централизованное управление схемами через Schema Registry упрощает разработку и обслуживание распределенных систем, повышая их устойчивость и эффективность.

Комментарии (14)


  1. kimi44
    29.04.2024 10:10

    Добрый день, очень интересно. Можете рассказать насколько внедрение схем влияет на пропускную способность или насколько оно увеличивает накладные расходы на проверку? Также интересно схемы складываются прямо в кафку в отдельный топик или сервис со схемами является отдельным хранилищем в которое кафка по апи ходит или еще как-то? Если данные летят в старом deprecated формате, то подразумевается, что мы как-то записываем логи или метрики об этом? Чтобы понять уменьшается ли у нас объем таких сообщений и можем ли мы убивать обратную совместимость или как мониторинг осуществляется на соответствие схемам?


    1. temirlan100 Автор
      29.04.2024 10:10

      насколько внедрение схем влияет на пропускную способность или насколько оно увеличивает накладные расходы на проверку?

      Мы наблюдали, но просадки в performance не заметили

      сервис со схемами является отдельным хранилищем в которое кафка по апи ходит

      именно так, но мысль про отдельный топик очень интересна!

      Если данные летят в старом deprecated формате, то подразумевается, что мы как-то записываем логи или метрики об этом?

      Логирование происходит в сервисе и в самих логах Kafka

      Чтобы понять уменьшается ли у нас объем таких сообщений и можем ли мы убивать обратную совместимость или как мониторинг осуществляется на соответствие схемам?

      Не уверен что до конца понял Ваш вопрос, есть топики с обязательным требованиям к обратной совместимости и если там что то Deprecated, происходил переезд в новый топик


      1. kimi44
        29.04.2024 10:10
        +1

        Не уверен что до конца понял Ваш вопрос, есть топики с обязательным требованиям к обратной совместимости и если там что то Deprecated, происходил переезд в новый топик

        Я вероятно не понял что есть режим совместимости. Есть v1 и v2 какой-то схемы. Совместимость обеспечивается тем, что kafka преобразует v1 в v2 каким-то внутренним механизмом, если ей на вход прийдет v1? А если в v1 данных в принципе для такого преобразования не хватает.
        Поэтому я решил, что вряд ли это имеется ввиду. Наверное просто у нас две спеки v1 и v2 обе считаются валидными, а уметь работать с обеими версиями это уже вопрос консьюмеров. Тогда предполагаю, что v1 можно поставить deprecated и потихоньку его вымывать по мере того, как продьюсеры v1 перестают генерить и вместо него генерят v2. Тогда для v1 должна собираться метрика, которая показывает какой объем до сих пор данных в deprecated v1 схеме продолжает приходить. Чтобы отловить момент, когда их начнет приходить 0 и убрать вообще поддержку схемы v1. Какая-то такая мысль была, но не знаю поддерживает ли сам процесс верификации на схему отправку метрик. Наверное нет и это опять же вопрос консьюмеров.


        1. temirlan100 Автор
          29.04.2024 10:10

          Совместимость обеспечивается тем, что kafka преобразует v1 в v2 каким-то внутренним механизмом, если ей на вход прийдет v1?

          Просто алерты которые через логи можно как то аггрегировать и анализировать в дальнейшем, но чтобы слету как-то преобразовать, такого не знаю и не встречал в своей практики =)


          1. kimi44
            29.04.2024 10:10

            Я начал еще вокруг этой темы смотреть чего изучить и попал на курс Akka Classic Serialization. Там ребята нагнетают, что у Java сериализации какой-то ужасающий перфоманс, но вот вы говорите, что даже не заметили изменений, а плюсы проверки схемы уже получили. Но непонятно насколько в их утверждении есть рациональное зерно и при какой нагрузке это стрелять начинает. Но опять же, там Akka и Scala, непонятно насколько оно надо вообще.


        1. bibmaster
          29.04.2024 10:10

          Я немножко не из мира java, но, насколько я представляю, дело обстоит примерно так: клиент имеет свою схему, через registry (регистрируя) он может получить её id. Если id схемы в получаемом сообщении совпадает с клиентской, всё просто. Если нет, клиент запрашивает схему сообщения из registry и если она совместима должен построить конвертер, который декодирует сообщение по старой (или может быть новой) схеме и по правилам эволюции схемы преобразует его в объект текущей схемы.


    1. bibmaster
      29.04.2024 10:10

      Абсолютно никак не влияет. Брокеру без разницы что в него кладут, он ничего не проверяет. Schema registry это то, с чем работают клиенты через отдельный api. Оно может быть вообще в другой кафке если что, кафка для этого сервиса просто хранилище. Режимы совместимости в avro просто накладывают ограничения на возможности модификации схем, так чтобы клиент имея локально схему версии X мог декодировать сообщение закодированное со схемой версии Y. Соответственно schema registry при обновлении схемы может такие ограничения проверять и не даст изменить схему если совместимость нарушается.


  1. kimi44
    29.04.2024 10:10

    А как быстро стартануть то? Какой-то есть материал, который вам сильно помог? В каком виде описываются схемы? Как их закидывать? UI или по API? Какой-нибудь толковый 101 очень бы не помешал, но возможно это идея для следующей статьи ))


    1. kimi44
      29.04.2024 10:10

      Еще бы понять конкретно что avro решает. С моей точки зрения есть топики и в них можно писать произвольный JSON. Этот JSON можно валидировать, но почему тогда все время речь о сериализации и десериализации в статьях про avro. Он же уже лежит в json формате в топике =) Но возможно мне просто стоит подробнее почитать про эту технологию


      1. bibmaster
        29.04.2024 10:10

        Если у вас JSON, вам скорее всего не нужен schema registry. Это вообще штука заточенная чисто под avro. Дело в том что это формат сериализации, в котором отсутствуют метаданные. Там нет названий полей как в json или тегов как в protobuf. Это просто бинарно закодированные подряд все поля структуры. И не зная с какой схемой было закодировано сообщение, декодировать его невозможно. Поэтому avro без schema registry существовать практически не может. Выбирая avro надо всегда иметь в виду что все producers и consumers становятся зависимыми от schema registry. И сам он становится критичной частью всей инфраструктуры.


        1. kimi44
          29.04.2024 10:10

          У меня действительно во все топики складывается JSON, но есть потребность проверять, что в определенные топики залетают только JSON определенного вида(содержащие определенные ключи, иногда надо проверить и value). Если есть какой-то механизм, который позволил бы проверять данные и писать ошибки в логи, если что-то не удовлетворяет условиям это было бы идеально. Еще идеальнее, если бы это все можно было хранить прямо по месту в kafka топиках и желательно без подключения внешнего хранилища спек. Но я не знаю существует ли для кафка что-то похожее на мое описание, но если вы знаете такое, то напишите, пожалуйста.


    1. temirlan100 Автор
      29.04.2024 10:10

      В каком виде описываются схемы?

      {
         "type" : "record",
         "namespace" : "someSchema",
         "name" : "Employee",
         "fields" : [
            { "name" : "Name" , "type" : "string" },
            { "name" : "Age" , "type" : "int" }
         ]
      }

      А как быстро стартануть то? Какой-то есть материал, который вам сильно помог?

      Сейчас и не вспомню к сожалению) все через документацию

      Как их закидывать? UI или по API?

      API =) UI тулзы не смотрел


  1. ptr128
    29.04.2024 10:10

    Сделать реестр схем первичным источником мне не удалось, так как из схемы автоматически вычищаются комментарии. А если одна и та же схема используется не только в Kafka, то это уже проблема. Например, эти комментарии нужны Swagger.


  1. SAPetrovich77
    29.04.2024 10:10

    Странная статья. В самом начале приведен пример настройки поставщика, в котором почему-то задаются свойства получателя и десереализаторв используемых получателем. Дальше те-же непонятки... Ну и тема как реально реализуются декларируемые прелести использования avro как мне кажется совсем не раскрыта...