Привет, Хабр! Сегодня я расскажу о своем опыте создания сервиса для отправки сообщений в Apache Kafka с использованием Spring Boot, аутентификацией SASL/Kerberos и применением Avro Schema Registry для продюсера. В процессе разработки я столкнулся с рядом проблем, решение которых потребовало усилий и времени. Надеюсь, мой опыт будет полезен разработчикам, которым предстоит решать подобные задачи.
Ведение
По техзаданию необходимо было создать сервис, который в зависимости от топика отправлял бы сообщения или на один инстанс Kafka (строку, с простой авторизацией с помощью SSL), или на другой, но уже с сериализацией и аутентификацией через Kerberos.
Обмен данными с обоими серверами Kafka использует шифрование с помощью сертификатов, второй использует дополнительную аутентификацию клиента с помощью Kerberos.
Приложение деплоится на Томкат.
Немного теории
Apache Kafka — популярная платформа потоковой передачи данных, часто используемая для создания высоконагруженных распределенных систем. Для обеспечения безопасности могут применятся механизмы аутентификации SASL/Kerberos. Avro Schema Registry используется для управления схемами данных и обеспечивает совместимость версий сообщений и валидации отправляемых\принимаемых данных.
Kerberos — это протокол аутентификации, разработанный для обеспечения безопасного доступа к сети. Он работает на основе системы тикетов, предоставляемых центральным сервером аутентификации (KDC — Key Distribution Center).
SSL (Secure Sockets Layer) используется для шифрования данных и аутентификации сервера и/или клиента. Чтобы настроить SSL для соединения с Kafka с помощью JKS (Java KeyStore), необходимо создать и настроить truststore
и keystore
файлы. Truststore
содержит сертификаты доверенных удостоверяющих центров (CA), а keystore
содержит закрытые ключи и сертификаты для идентификации сервера или клиента. В Kafka клиентской конфигурации необходимо указать местоположения этих файлов и их пароли.
Настройка Spring Boot и зависимостей
Первым шагом добавляем необходимые зависимости в pom.xml
:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>6.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
Настройка SSL
В конфигурации соединения необходимо указать путь к jks
файлу и пароля к нему. В примере ниже это producerRetailFactory
.
Настройка Kerberos
Настройка Kerberos требует наличия конфигурационного файла krb5.conf
и указания пути к keytab
. Файл krb5.conf
должен содержать информацию о KDC (Key Distribution Center). В примере ниже это producerMainFactory
.
[libdefaults]
default_realm = TEST_DOMAIN.TRUS.TCOMPANY
[realms]
TEST_DOMAIN.TRUS.TCOMPANY = {
kdc = DC00.TEST_DOMAIN.trus.tCOMPANY
admin_server = S000.TEST_DOMAIN.trus.tCOMPANY
kdc = DC01.TEST_DOMAIN.TRUS.TCOMPANY
}
[domain_realm]
.TEST_DOMAIN.trus.tCOMPANY = TEST_DOMAIN.TRUS.TCOMPANY
TEST_DOMAIN.trus.tCOMPANY = TEST_DOMAIN.TRUS.TCOMPANY
keytab
это двоичный файл содержащий пары Kerberos принципалов и их ключей (полученных с использованием Kerberos пароля). Эти файлы используются для аутентификации, без ввода пароля.
Обычно эти данные предоставляются администраторами сервисов.
Конфигурация для двух серверов Kafka
Так как у нас два разных брокера Kafka, требующих разные типы аутентификации, мы создадим специальный класс для конфигурирования. Один будет использовать SSL для аутентификации, другой — SSL, SASL/Kerberos.
Важно, в классе конфигурации, путь к
krb5.conf
должен быть отдельно указан (иначе будет использован системный) и указывать его нужно не через props (как остальные параметры), а через установку переменной окружения.
System.setProperty("java.security.krb5.conf", krb5ConfFile);
@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {
private final Environment environment;
private final JMServiceDao jmServiceDao;
public KafkaConfig(Environment environment, JMServiceDao jmServiceDao) {
this.environment = environment;
this.jmServiceDao = jmServiceDao;
}
@Bean
public ProducerFactory<String, Object> producerRetailFactory() {
Map<String, Object> props = new HashMap<>();
String hostname = Optional.ofNullable(System.getenv("HOSTNAME")).filter(Predicate.not(String::isBlank)).orElse(environment.getProperty("COMPUTERNAME", "Localhost"));
props.put(CLIENT_ID_CONFIG, hostname + "_" + environment.getProperty("spring.application.name"));
props.put(BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("KafkaRetailBrokerHost", jmServiceDao.getKafkaRetailBrokerHost()));
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.truststore.location", environment.getProperty("KafkaRetailSSLKeyStoreLocation" + "kafka.retail.truststore.jks",
environment.getProperty("catalina.base") +
jmServiceDao.getKafkaRetailSSLKeyStoreLocation()));
props.put("ssl.truststore.password", environment.getProperty("KafkaRetailSSLKeyStorePwd", jmServiceDao.getKafkaRetailSSLKeyStorePassword()));
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ProducerFactory<String, Object> producerMainFactory() {
// важно! путь к keyTab должен быть экраннированным для windows path
String keyTabFile = Paths.get(environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainKeytabLocation()).toAbsolutePath().normalize().toString().replace("\\", "/");
String krb5ConfFile = Paths.get(environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainKrb5ConfLocation()).toAbsolutePath().normalize().toString().replace("\\", "/");
String jksFile = Paths.get(environment.getProperty("KafkaMainSSLKeyStoreLocation" + "kafka.retail.truststore.jks",
environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainSSLKeyStoreLocation())).toAbsolutePath().normalize().toString().replace("\\", "/");
Map<String, Object> props = new HashMap<>();
String hostname = Optional.ofNullable(environment.getProperty("HOSTNAME")).filter(Predicate.not(String::isBlank)).orElse(environment.getProperty("COMPUTERNAME", "Localhost"));
props.put(CLIENT_ID_CONFIG, hostname + "_" + environment.getProperty("spring.application.name"));
props.put(BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("KafkaMainBrokerHost", jmServiceDao.getKafkaMainBrokerHost()));
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("specific.avro.reader", "true");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.jaas.config", """
com.sun.security.auth.module.Krb5LoginModule required serviceName="kafka"
useKeyTab=true storeKey=true keyTab="%s" principal="%s";
""".formatted(keyTabFile, jmServiceDao.getKafkaMainPrincipal()));
props.put("session.timeout.ms", "45000");
props.put("schema.registry.url", jmServiceDao.getKafkaMainSchemaRegistryUrl());
// параметр устанавливается не через props
System.setProperty("java.security.krb5.conf", krb5ConfFile);
props.put("security.protocol", "SASL_SSL");
props.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1,TLSv1");
props.put("ssl.truststore.location", environment.getProperty("KafkaMainSSLKeyStoreLocation" + "kafka.main.truststore.jks", jksFile));
props.put("ssl.truststore.password", environment.getProperty("KafkaMainSSLKeyStorePwd", jmServiceDao.getKafkaMainSSLKeyStorePassword()));
//log.debug("kafka props {}", props);
//log.debug("system env {}", System.getProperties());
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplateForRetail() {
return new KafkaTemplate<>(producerRetailFactory());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplateForMain() {
return new KafkaTemplate<>(producerMainFactory());
}
}
В этом классе необходимо обратить внимание (кроме обязательных параметров), на параметр sasl.jaas.config
и здесь я столкнулся с первой проблемой, оказалось, что нигде нет проверки на правильность пути к keyTab
файлу. Разработка велась в ОС Windows, и windows путь, например C:\ra_projects\Java\apache-tomcat-10.1.10\conf\keystore\tkafka.keytab
из-за отсутствия экранирования слешей (путь хранится в БД), преобразуется во внутренностях класса связанного с авторизацией, в C:a_projectsJava�pache-tomcat-10.1.10confkeystore kafka.keytab
, в Exception я получал только, что авторизация не удалась:
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
И только в режиме Debug я увидел кривой путь к файлу
keyTab
. Добавил экранирование. Так же завел issue.
Вторая проблема, я рандомно получал ошибку:
javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will retry.
Оказалось, что мне предоставили неправильный файл
krb5.conf
, содержащий неправильные пути к KDC доменам. А в последствии, что распарсенный файлkrb5.conf
хранится кэшированным, и даже если его содержимое поменять, он не будет перечитан до перезапуска Tomcat сервера, на который деплоилось мое приложение.
Сериализация данных
В конфигурационном классе KafkaConfig
необходимо указать домен к хранилище схем:
props.put("schema.registry.url", jmServiceDao.getKafkaMainSchemaRegistryUrl());
Для отправки данных в Kafka был написан класс, который использует две конфигурации в зависимости от данных, которые были получены для отправки:
@Service
@Slf4j
public class JMServiceImpl implements JMService {
private enum KafkaType {
KAFKA_RETAIL,
KAFKA_MAIN
}
private final JMServiceDao jmServiceDao;
// авторизация через SSL, тут продюсер для отправки "сырых" данных
private final KafkaTemplate<String, Object> kafkaTemplateForRetail;
// авторизация через SSL,Kerberos, тут продюсер для отправки отправки сериализованных данных
private final KafkaTemplate<String, Object> kafkaTemplateForMain;
@Autowired
public JMServiceImpl(JMServiceDao plcPackageDao,
@Qualifier("kafkaTemplateForRetail") KafkaTemplate<String, Object> kafkaTemplateForRetail,
@Qualifier("kafkaTemplateForMain") KafkaTemplate<String, Object> kafkaTemplateForMain) {
this.jmServiceDao = plcPackageDao;
this.kafkaTemplateForRetail = kafkaTemplateForRetail;
this.kafkaTemplateForMain = kafkaTemplateForMain;
}
@Override
public Mono<KafkaSendOKResponse> sendMessage(QueueMessageId queueMessageId) throws IOException {
log.info("Попытка отправить сообщение queueMessageId = {}", queueMessageId.getQueueMessageId());
QueueMessage message = jmServiceDao.getQueueMessage(queueMessageId.getQueueMessageId());
if (message == null) {
throw new ResponseStatusException(NOT_FOUND, "В теле запроса указан несуществующий идентификатор сообщения в очереди");
} else {
KafkaType messagingSystem = KafkaType.valueOf(message.getMessagingSystemCode());
Object messageForSend;
KafkaTemplate<String, Object> kafkaTemplate = switch (messagingSystem) {
case KAFKA_MAIN -> {
ObjectMapper objectMapper = new ObjectMapper();
//строка с данными в JSON
JsonNode rootNode = objectMapper.readTree(message.getMessageData());
//нужна сериализация
SendPTI ptiMessage = new SendPTI();
ptiMessage.setSiebelId(rootNode.get("siebel_id").asText());
ptiMessage.setPti(ByteBuffer.wrap(rootNode.get("pti").asText().getBytes(StandardCharsets.US_ASCII)));
ptiMessage.setPtiDate(Instant.parse(rootNode.get("pti_date").asText()));
messageForSend = ptiMessage;
yield kafkaTemplateForMain;
}
case KAFKA_RETAIL -> {
//отправляем просто строку
messageForSend = message.getMessageData();
yield kafkaTemplateForRetail;
}
default -> throw new IllegalArgumentException("Неподдерживаемая система: " + messagingSystem);
};
log.debug("Отправляем в топик {} данные {}", message.getTopic(), messageForSend);
return Mono.fromFuture(kafkaTemplate.send(message.getTopic(), messageForSend))
.map(result -> {
log.debug("Результат {} отправки сообщения {}", result, queueMessageId.getQueueMessageId());
LocalDateTime localDateTime = result.getRecordMetadata().hasTimestamp()
? LocalDateTime.ofInstant(Instant.ofEpochMilli(result.getRecordMetadata().timestamp()), ZoneId.systemDefault())
: null;
return new KafkaSendOKResponse()
.setQueueMessageId(queueMessageId.getQueueMessageId())
.setSendStatus(1)
.setRecordMetadata(result.getRecordMetadata().toString())
.setSentDate(localDateTime);
})
.doOnError(ex -> {
log.error("Произошла ошибка {} при отправке сообщения {}", ex, queueMessageId.getQueueMessageId());
});
}
}
}
Выборка нужной конфигурации (бина) осуществляется с помощью аннотации @Qualifier. Для KAFKA_MAIN используется Avro сериализатор, для KAFKA_RETAIL отправляется строка.
Для генерации класса SendPTI
для сериализации отправляемых\принимаемых данных используется Maven плагин:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
В нем нужно указать путь к *.avcs
файлу, содержащему исходную схему, мне был предоставлен такой:
{
"type": "record",
"name": "SendPTI",
"namespace": "com.technology.kafkaproducer.jmessageservice.SendPTI",
"fields": [
{
"name": "siebel_id",
"type": "string"
},
{
"name": "pti",
"type": {
"type": "bytes",
"scale": 5,
"precision": 7,
"connect.version": 1,
"connect.parameters": {
"scale": "5",
"connect.decimal.precision": "7"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
},
{
"name": "pti_date",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
}
],
"connect.name": "ru.COMPANY.schema.SCORF.SendPTI"
}
При сборке проекта будет запускаться плагин и будет сгенерирован POJO класс, в моем примере выше это класс SendPTI.
Когда все уже было готово, я столкнулся с другой проблемой; доступ в сеть у меня осуществляется через рабочий прокси, но при попытке отправить сообщение в Kafka, происходила безуспешная попытка подключения к Schema Registry и приложение падало с ошибкой:
2024-05-29 11:09:15.754 [http-nio-8081-exec-2] ERROR i.c.k.s.client.rest.RestService - Failed to send HTTP request to endpoint: https://schema-kafka-test/subjects/SCORF.SendPTI-value/versions/latest/ java.io.IOException: Unable to tunnel through proxy. Proxy returns "HTTP/1.1 407 Proxy Authentication Required"
Решением стало добавление дополнительных параметром при запуске моего локального (разработческого) сервера Томкат:
-Dhttps.proxyHost=proxy.host
-Dhttps.proxyPort=8080
-Dhttps.proxyUser=user
-Dhttps.proxyPassword=superSecretPassword
После этого я смог успешно отправлять данные из одного сервиса на два разных сервера Kafka с разными типами авторизации и с сериализацией и без.
Мира!
Khamsha
А сколько времени ушло в итоге, чтобы все эти трудности преодолеть? Мы недавно возились с подобной задачей на питоне и докере, и потратили несколько вечеров троих инженеров
foxyrus Автор
Делал один, дня 4 чтобы подобрать все нужны параметры.