В прошлой статье я писал о том, что такое Schema Registry и как используется в Apache Kafka. Сегодня я хочу углубиться в тему и описать поведение системы при различных типах совместимости . Правильное понимание и применение этих типов совместимости поможет обеспечить стабильность и гибкость системы при изменениях структуры данных.
Для удобства, ниже представлена таблица, которая обобщает информацию о различных типах совместимости схем в Confluent Schema Registry:
Тип совместимости |
Разрешенные изменения |
Проверка с какими схемами |
Обновлять в первую очередь |
---|---|---|---|
BACKWARD |
- Удаление полей |
Последняя версия |
Consumer |
BACKWARD_TRANSITIVE |
- Удаление полей |
Все предыдущие версии |
Consumer |
FORWARD |
- Добавление полей |
Последняя версия |
Producer |
FORWARD_TRANSITIVE |
- Добавление полей |
Все предыдущие версии |
Producer |
FULL |
- Добавление опциональных полей |
Последняя версия |
В любом порядке |
FULL_TRANSITIVE |
- Добавление опциональных полей |
Все предыдущие версии |
В любом порядке |
NONE |
- Все изменения разрешены |
Проверка совместимости отключена |
Зависит от ситуации |
Теперь давайте рассмотрим более детальнее типы совместимости, которые помогут поддерживать стабильность ваших данных и сервисов.
Confluent Schema Registry поддерживает несколько типов совместимости, включая:
None
Backward Compatibility
Forward Compatibility
Full Compatibility
Backward Transitive Compatibility
Forward Transitive Compatibility
Full Transitive Compatibility
Разберем этих 7 самураев совместимости и посмотрим, как каждый из них работает на практике.
Начнем с самого простого и неинтересного тривиального типа – None. Этот тип совместимости означает, что схема может быть изменена без каких-либо ограничений. При использовании None Schema Registry не проверяет новые схемы на совместимость с предыдущими версиями. Если consumer и producer данных ожидают различные структуры данных, то не трудно догадаться какие последствия могут возникнуть.
Теперь разберем более сложные и интересные типы совместимости, которые помогут вам поддерживать стабильность ваших данных и сервисов.
Backward Compatibility
Обратная совместимость означает, что новые версии схем должны быть совместимы с предыдущими версиями. Это означает, что данные, записанные с использованием старых версий схем, могут быть прочитаны новыми версиями схем без ошибок. Это полезно для ситуаций, когда у вас есть старые данные, которые должны быть доступны для новых приложений.
Producer: Новые данные, которые записываются с использованием новой версии схемы, должны быть совместимы с предыдущими версиями. Producer использует схему, зарегистрированную в Schema Registry, и если новая схема несовместима с предыдущими, Producer получит ошибку при попытке отправить данные.
Consumer: Потребители могут продолжать читать данные, используя новую версию схемы, даже если данные были записаны с использованием старой версии. Consumer может столкнуться с ситуацией, когда новые поля, добавленные в схему, отсутствуют в старых данных.
// Старая версия схемы
var oldSchema = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
// Новая версия схемы с добавленным полем "age"
var newSchema = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\"},{\\"name\\":\\"age\\",\\"type\\":\\"int\\",\\"default\\":0}]}";
// Producer с новой схемой
var producer = new KafkaProducer<String, GenericRecord>(producerProps);
var schema = new Schema.Parser().parse(newSchema);
var user1 = new GenericData.Record(schema);
user1.put("name", "Alice");
user1.put("age", 30); // Новое поле "age"
producer.send(new ProducerRecord<>("users", "key1", user1));
// Consumer с новой схемой, читает данные, записанные со старой схемой
var consumer = new KafkaConsumer<String, GenericRecord>(consumerProps);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
var name = user.get("name").toString();
// Старые данные могут не содержать "age", поэтому проверяем наличие
var age = user.get("age") != null ? (Integer) user.get("age") : 0; // Устанавливаем значение по умолчанию для старых данных
}
}
Producer:
➕ Возможность добавлять новые поля без нарушения существующих данных.
➕ Позволяет расширять схему с минимальными изменениями в коде.
➖ Невозможность удалить поля без предоставления значения по умолчанию.
➖ Необходимо тщательное управление версионированием схем, чтобы избежать ошибок совместимости.
Consumer:
➕ Гарантия того, что данные, записанные старыми версиями схемы, будут читаться без ошибок.
➕ Упрощает процесс миграции на новые версии схемы без изменения логики обработки данных.
➖ Старые данные могут не содержать новые поля, что требует дополнительных проверок и обработки.
➖ Ограниченная гибкость при изменении структуры данных, поскольку новые поля должны быть совместимы с предыдущими версиями.
Forward Compatibility
Прямая совместимость означает, что старые версии схем должны быть совместимы с новыми версиями. Это означает, что данные, записанные с использованием новых версий схем, могут быть прочитаны старыми версиями схем без ошибок.
Producer: Producer может добавлять новые поля и удалять опциональные поля. Если схема, зарегистрированная в Schema Registry, несовместима с предыдущими, Producer получит ошибку при попытке отправить данные.
Consumer: Потребители могут продолжать читать данные, используя старую версию схемы, даже если данные были записаны с использованием новой версии.
// Старая версия схемы
var oldSchema = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
// Новая версия схемы с добавленным полем "age"
var newSchema = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\"},{\\"name\\":\\"age\\",\\"type\\":\\"int\\"}]}";
// Producer с новой схемой
var producer = new KafkaProducer<String, GenericRecord>(producerProps);
var schema = new Schema.Parser().parse(newSchema);
var user1 = new GenericData.Record(schema);
user1.put("name", "Alice");
user1.put("age", 30); // Новое поле "age"
producer.send(new ProducerRecord<>("users", "key1", user1));
// Consumer со старой схемой, читает данные, записанные с новой схемой
var consumer = new KafkaConsumer<String, GenericRecord>(consumerProps);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
String name = user.get("name").toString();
// Поскольку consumer использует старую схему, он игнорирует поле "age"
System.out.println("User: " + name);
}
}
Producer:
➕ Возможность добавлять новые поля и удалять опциональные поля.
➕ Позволяет расширять схему с минимальными изменениями в коде.
➖ Producer должен гарантировать, что новые данные могут быть прочитаны старыми потребителями.
➖ Необходимо тщательное управление версионированием схем, чтобы избежать ошибок совместимости.
Consumer:
➕ Гарантия того, что старые версии схем могут читать новые данные без ошибок.
➕ Упрощает процесс миграции на новые версии схемы без изменения логики обработки данных.
➖ Старые потребители могут игнорировать новые поля, что может требовать дополнительных проверок и обработки.
➖ Ограниченная гибкость при изменении структуры данных, поскольку новые данные должны быть совместимы со старыми версиями схем.
Full Compatibility
Полная совместимость означает, что новая версия схемы должна быть совместима как с предыдущими версиями (backward compatibility), так и с будущими версиями (forward compatibility). Это обеспечивает максимальную гибкость и стабильность при работе с данными, так как гарантирует, что данные, записанные как старыми, так и новыми версиями схем, могут быть прочитаны без ошибок любой версией схемы. Самый любимый и часто используемый мною вариант!
Producer: Новая схема должна позволять запись данных, которые могут быть прочитаны как старыми, так и новыми потребителями. Producer может добавлять опциональные поля и удалять опциональные поля. Если схема, зарегистрированная в Schema Registry, несовместима с предыдущими и будущими версиями, Producer получит ошибку при попытке отправить данные.
Consumer: Потребители могут читать данные, записанные как старыми, так и новыми версиями схем. Это требует тщательной проверки, чтобы новые поля были опциональными и имели значения по умолчанию, чтобы избежать ошибок.
var oldSchema = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}";
var newSchema = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"null\", \"int\"], \"default\":null}]}";
// Producer с новой схемой
var producer = new KafkaProducer<String, GenericRecord>(producerProps);
var schema = new Schema.Parser().parse(newSchema);
var user1 = new GenericData.Record(schema);
user1.put("name", "Alice");
// Новое поле "age" может быть не указано, так как оно опциональное
producer.send(new ProducerRecord<>("users", "key1", user1));
// Consumer с новой схемой, читает данные, записанные с использованием старой схемы
var consumer = new KafkaConsumer<String, GenericRecord>(consumerProps);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
var user = record.value();
var name = user.get("name").toString();
// Старые данные могут не содержать "age", поэтому проверяем наличие
var age = user.get("age") != null ? (Integer) user.get("age") : null; // Устанавливаем значение по умолчанию для старых данных
System.out.println("User: " + name + ", Age: " + (age != null ? age : "N/A"));
}
}
Producer:
➕ Возможность добавлять опциональные поля и удалять опциональные поля.
➕ Гарантия, что данные будут совместимы как с предыдущими, так и с будущими версиями схем.
➖ Ограничения на изменения, чтобы гарантировать совместимость с предыдущими и будущими версиями.
Consumer:
➕ Гарантия, что данные, записанные старыми и новыми версиями схем, будут читаться без ошибок.
➕ Упрощает процесс миграции на новые версии схемы без изменения логики обработки данных.
➖ Требуется проверка и обработка опциональных полей, чтобы избежать ошибок при чтении данных.
➖ Ограниченная гибкость при изменении структуры данных, поскольку новые данные должны быть совместимы как с предыдущими, так и с будущими версиями схем.
Все типы совместимости выше не учитывают историчность схем, как бы смотрим на шаг назад или вперед поэтому следующие форматы на мой взгляд более серьезные которые требует тщательного анализа прежде чем их тащить к себе в production! Ниже я не буду описывать все транзитивные зависимости, а лучше опишу самую на мой взгляд осторожную и безопасную стратегию, остальные думаю будут очевидны складывая материал далее и что уже есть в статье!
Full Transitive Compatibility
Полная транзитивная совместимость — это наиболее строгий тип совместимости схем. Он требует, чтобы новая версия схемы была совместима со всеми предыдущими и будущими версиями. Это означает, что данные, записанные любой версией схемы, могут быть прочитаны любой другой версией схемы без ошибок. Этот тип совместимости обеспечивает максимальную стабильность данных.
Producer: новая схема должна позволять запись данных, которые могут быть прочитаны как старыми, так и новыми потребителями. Producer может добавлять опциональные поля и удалять опциональные поля. Если схема, зарегистрированная в Schema Registry, несовместима с любой из предыдущих и будущих версий, Producer получит ошибку при попытке отправить данные.
Consumer: могут читать данные, записанные как старыми, так и новыми версиями схем. Это требует тщательной проверки, чтобы новые поля были опциональными и имели значения по умолчанию, чтобы избежать ошибок.
var schemaV1 = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}";
var schemaV2 = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"null\", \"int\"],\"default\":null}]}";
var schemaV3 = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"null\", \"int\"],\"default\":null},{\"name\":\"email\",\"type\":[\"null\", \"string\"],\"default\":null}]}";
// Producer с третьей версией схемы
var producer = new KafkaProducer<String, GenericRecord>(producerProps);
var schema = new Schema.Parser().parse(schemaV3);
var user1 = new GenericData.Record(schema);
user1.put("name", "Alice");
user1.put("age", 30); // Опциональное поле "age"
user1.put("email", "alice@example.com"); // Опциональное поле "email"
producer.send(new ProducerRecord<>("users", "key1", user1));
// Consumer с третьей версией схемы, читает данные, записанные со всеми предыдущими версиями схемы
var consumer = new KafkaConsumer<String, GenericRecord>(consumerProps);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
var user = record.value();
var name = user.get("name").toString();
// Старые данные могут не содержать "age" и "email", поэтому проверяем наличие
var age = user.get("age") != null ? (Integer) user.get("age") : null;
var email = user.get("email") != null ? user.get("email").toString() : "N/A";
System.out.println("User: " + name + ", Age: " + (age != null ? age : "N/A") + ", Email: " + email);
}
}
Producer:
➕ Гарантия, что данные, записанные с использованием новой версии схемы, будут совместимы со всеми предыдущими и будущими версиями.
➕ Обеспечивает максимальную стабильность и совместимость данных.
➖ Ограничения на изменения, чтобы гарантировать совместимость со всеми предыдущими и будущими версиями схем.
➖ Требует тщательного управления версиями схем и их изменениями.
Для Consumer:
➕ Гарантия, что данные, записанные любой предыдущей и будущей версией схемы, будут читаться без ошибок.
➕ Упрощает процесс миграции на новые версии схемы без изменения логики обработки данных.
➖ Необходимость проверки и обработки данных, чтобы избежать ошибок при чтении данных с разных версий схем.
➖ Ограниченная гибкость при изменении структуры данных, поскольку новые данные должны быть совместимы со всеми предыдущими и будущими версиями схем.
Каждая стратегия эволюции схем в Confluent Schema Registry имеет свои плюсы и минусы. Выбор подходящей стратегии зависит от ваших потребностей, стабильности данных и требуемой гибкости! Вот краткое резюме рассмотренных стратегий:
Backward Compatibility: Обеспечивает совместимость с предыдущими версиями схемы. Отлично подходит, когда нужно минимизировать изменения в данных.
Forward Compatibility: Обеспечивает совместимость с будущими версиями схемы. Полезно при частых обновлениях схем.
Full Compatibility: Объединяет плюсы Backward и Forward Compatibility, обеспечивая совместимость с предыдущими и будущими версиями схемы. Максимум гибкости и стабильности!
Full Transitive Compatibility: Максимально строгий и надежный тип совместимости, обеспечивающий, что данные любой версии схемы могут быть прочитаны любой другой версией схемы. Абсолютный чемпион по совместимости!
Чтобы облегчить работу с Confluent Schema Registry, вот несколько полезных ресурсов и инструментов:
Open-source инструмент с удобным веб-интерфейсом для работы со схемами. Schema Registry UI на GitHub
-
Еще один open-source инструмент для управления Apache Kafka, включающий возможности работы со схемами. AKHQ на GitHub