Привет, Хабр! Меня зовут Николай Пискунов, я ведущий разработчик в подразделении Big Data.
В блоге beeline cloud я рассказывал о Spring Data JPA, Hibernate, делился личными наблюдениями, как облегчить себе жизнь при написании тестов. Сегодня речь о другом: расскажу, как инжектить в статические поля. Как всегда — на примерах. Поехали.
На практике десериализаторов, представленных в ядре Spring, хватает в 99% случаев. Но бывают ситуации, когда всё же требуется описать свою логику предобработки входящего сообщения.
Для наглядности предлагаю рассмотреть гипотетический случай, когда на вход Kafka подается строка лога с задаваемым разделителем. В этой строке нас интересует лишь часть сообщения: мы будем делить строку по определенному символу и собирать из получившегося массива новую строку.
Задавать разделитель и индексы в файле application.properties мы планируем так:
kafka.message.deserializer.delimiter=,
kafka.message.deserializer.indexes=0,2,3,5
Сразу скажу, что org.apache.kafka.common.serialization.Deserializer и все его реализации не знают о спринговом контексте ничего — от слова совсем. Даже если попытаться сделать из него бин, пометив соответствующей аннотацией, например @Component.
Для этого напишем сам Deserializer и попробуем прямо в него заинжектить требуемые настройки, используя спринговую аннотацию @Value:
@Slf4j
@Component
public class SmartStringDeserializer implements Deserializer<String> {
@Value("${kafka.message.deserializer.delimiter}")
String delimiter;
@Value("${kafka.message.deserializer.indexes}")
List<Integer> indexes;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}
@Override
public String deserialize(String s, byte[] bytes) {
return setValueByIndex(bytes);
}
@Override
public void close() {
Deserializer.super.close();
}
private String setValueByIndex(byte[] bytes) {
String msg = new String(bytes, StandardCharsets.UTF_8);
LOGGER.debug("Income message: {}", msg);
if (indexes.getFirst() == -1) {
return msg;
}
String[] incomeMessage = msg.split(delimiter);
String formattedIncomeMessage = indexes.stream()
.map(index -> incomeMessage[index])
.collect(Collectors.joining(delimiter));
LOGGER.debug("Consumed message: {}", formattedIncomeMessage);
return formattedIncomeMessage;
}
}
К сожалению, нам вернется null при попытке получить значения:
А если мы проставим стандартные значения в @Values, результат будет таким же:
@Value("${kafka.message.deserializer.delimiter:,}")
Но можно использовать статические поля, описав их в соседнем классе. Вот только надо придумать, как в них заинжектить данные из спрингового контекста. Конечно, это должен быть спринговый бин, например @Component, и в нем мы определим статические поля, включая поля, которые будут заполняться данными из properties:
@Component
public class DeserializerUtils {
public static String DELIMITER;
public static List<Integer> INDEXES;
private String delimiter;
private List<Integer> indexes;
…
}
Теперь напишем сеттеры, они-то и будут присваивать значения статическим полям:
@Component
public class DeserializerUtils {
public static String DELIMITER;
public static List<Integer> INDEXES;
private String delimiter;
private List<Integer> indexes;
@Value("${kafka.message.deserializer.delimiter:,}")
private void setDelimiter(String delimiter){
DeserializerUtils.DELIMITER = delimiter;
}
@Value("${kafka.message.deserializer.indexes:-1}")
private void setIndexes(List<Integer> indexes){
DeserializerUtils.INDEXES = indexes;
}
}
Важно помнить, что @Values необходимо навешивать над методами, иначе не заработает. И, если вы решите обновить контекст без перезапуска приложения, например, поправив значения в файле конфигураций, обновятся и статические поля.
Теперь перепишем сам десериалайзер:
@Slf4j
public class SmartStringDeserializer implements Deserializer<String> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}
@Override
public String deserialize(String s, byte[] bytes) {
return setValueByIndex(bytes);
}
@Override
public void close() {
Deserializer.super.close();
}
private String setValueByIndex(byte[] bytes) {
String delimiter = DeserializerUtils.DELIMITER;
List<Integer> indexes = DeserializerUtils.INDEXES;
String msg = new String(bytes, StandardCharsets.UTF_8);
LOGGER.debug("Income message: {}", msg);
if (indexes.getFirst() == -1) {
return msg;
}
String[] incomeMessage = msg.split(delimiter);
String formattedIncomeMessage = indexes.stream()
.map(index -> incomeMessage[index])
.collect(Collectors.joining(delimiter));
LOGGER.debug("Consumed message: {}", formattedIncomeMessage);
return formattedIncomeMessage;
}
}
А вот теперь мы можем конфигурировать Deserializer, используя спринговый контекст:
У такого подхода есть свои минусы. По факту у нас дублируются данные в хипе, которые не удалит GC ввиду того, что первые хранятся в контексте спринга, вторые — в статических полях. Поэтому старайтесь инжектить конкретные бины и не делайте так:
public static ApplicationContext CONTEXT;
private ApplicationContext context;
private void setIndexes(ApplicationContext context){
DeserializerUtils.CONTEXT = context;
}
beeline cloud — secure cloud provider. Разрабатываем облачные решения, чтобы вы предоставляли клиентам лучшие сервисы.
Комментарии (3)
Duke_Raven
22.05.2024 11:55Если вопрос больше о том, как прокинуть настройки из конфига в кастомный десериалайзер, то это можно сделать просто объявив эти настройки в ветке
spring.kafka.consumer.properties
При создании десериалайзера Spring вызовет
configure
и передаст их туда.Как с ними работает Spring можно посомтреть например в
org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
Beholder
Можно объяснить, почему?