Привет, Хабр! Сегодня я расскажу о своем опыте создания сервиса для отправки сообщений в Apache Kafka с использованием Spring Boot, аутентификацией SASL/Kerberos и применением Avro Schema Registry для продюсера. В процессе разработки я столкнулся с рядом проблем, решение которых потребовало усилий и времени. Надеюсь, мой опыт будет полезен разработчикам, которым предстоит решать подобные задачи.

Ведение

По техзаданию необходимо было создать сервис, который в зависимости от топика отправлял бы сообщения или на один инстанс Kafka (строку, с простой авторизацией с помощью SSL), или на другой, но уже с сериализацией и аутентификацией через Kerberos.

Обмен данными с обоими серверами Kafka использует шифрование с помощью сертификатов, второй использует дополнительную аутентификацию клиента с помощью Kerberos.

Приложение деплоится на Томкат.

Немного теории

Apache Kafka — популярная платформа потоковой передачи данных, часто используемая для создания высоконагруженных распределенных систем. Для обеспечения безопасности могут применятся механизмы аутентификации SASL/Kerberos. Avro Schema Registry используется для управления схемами данных и обеспечивает совместимость версий сообщений и валидации отправляемых\принимаемых данных.

Kerberos — это протокол аутентификации, разработанный для обеспечения безопасного доступа к сети. Он работает на основе системы тикетов, предоставляемых центральным сервером аутентификации (KDC — Key Distribution Center).

SSL (Secure Sockets Layer) используется для шифрования данных и аутентификации сервера и/или клиента. Чтобы настроить SSL для соединения с Kafka с помощью JKS (Java KeyStore), необходимо создать и настроить truststore и keystore файлы. Truststore содержит сертификаты доверенных удостоверяющих центров (CA), а keystore содержит закрытые ключи и сертификаты для идентификации сервера или клиента. В Kafka клиентской конфигурации необходимо указать местоположения этих файлов и их пароли.

Настройка Spring Boot и зависимостей

Первым шагом добавляем необходимые зависимости в pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>6.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>
</dependencies>

Настройка SSL

В конфигурации соединения необходимо указать путь к jks файлу и пароля к нему. В примере ниже это producerRetailFactory.

Настройка Kerberos

Настройка Kerberos требует наличия конфигурационного файла krb5.conf и указания пути к keytab. Файл krb5.conf должен содержать информацию о KDC (Key Distribution Center). В примере ниже это producerMainFactory.

[libdefaults]
default_realm = TEST_DOMAIN.TRUS.TCOMPANY
[realms]
TEST_DOMAIN.TRUS.TCOMPANY = {
  kdc  = DC00.TEST_DOMAIN.trus.tCOMPANY
  admin_server = S000.TEST_DOMAIN.trus.tCOMPANY
  kdc  = DC01.TEST_DOMAIN.TRUS.TCOMPANY
}
[domain_realm]
.TEST_DOMAIN.trus.tCOMPANY = TEST_DOMAIN.TRUS.TCOMPANY
TEST_DOMAIN.trus.tCOMPANY = TEST_DOMAIN.TRUS.TCOMPANY

keytab это двоичный файл содержащий пары Kerberos принципалов и их ключей (полученных с использованием Kerberos пароля). Эти файлы используются для аутентификации, без ввода пароля.

Обычно эти данные предоставляются администраторами сервисов.

Конфигурация для двух серверов Kafka

Так как у нас два разных брокера Kafka, требующих разные типы аутентификации, мы создадим специальный класс для конфигурирования. Один будет использовать SSL для аутентификации, другой — SSL, SASL/Kerberos.

Важно, в классе конфигурации, путь к krb5.conf должен быть отдельно указан (иначе будет использован системный) и указывать его нужно не через props (как остальные параметры), а через установку переменной окружения.

System.setProperty("java.security.krb5.conf", krb5ConfFile);
@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {

    private final Environment environment;
    private final JMServiceDao jmServiceDao;

    public KafkaConfig(Environment environment, JMServiceDao jmServiceDao) {
        this.environment = environment;
        this.jmServiceDao = jmServiceDao;
    }

    @Bean
    public ProducerFactory<String, Object> producerRetailFactory() {
        Map<String, Object> props = new HashMap<>();

        String hostname = Optional.ofNullable(System.getenv("HOSTNAME")).filter(Predicate.not(String::isBlank)).orElse(environment.getProperty("COMPUTERNAME", "Localhost"));
        props.put(CLIENT_ID_CONFIG, hostname + "_" + environment.getProperty("spring.application.name"));
        props.put(BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("KafkaRetailBrokerHost", jmServiceDao.getKafkaRetailBrokerHost()));

        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        props.put("security.protocol", "SSL");
        props.put("ssl.enabled.protocols", "TLSv1.2");
        props.put("ssl.truststore.location", environment.getProperty("KafkaRetailSSLKeyStoreLocation" + "kafka.retail.truststore.jks",
                environment.getProperty("catalina.base") +
                        jmServiceDao.getKafkaRetailSSLKeyStoreLocation()));

        props.put("ssl.truststore.password", environment.getProperty("KafkaRetailSSLKeyStorePwd", jmServiceDao.getKafkaRetailSSLKeyStorePassword()));

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public ProducerFactory<String, Object> producerMainFactory() {
        // важно! путь к keyTab должен быть экраннированным для windows path
        String keyTabFile = Paths.get(environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainKeytabLocation()).toAbsolutePath().normalize().toString().replace("\\", "/");
        String krb5ConfFile = Paths.get(environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainKrb5ConfLocation()).toAbsolutePath().normalize().toString().replace("\\", "/");
        String jksFile = Paths.get(environment.getProperty("KafkaMainSSLKeyStoreLocation" + "kafka.retail.truststore.jks",
                environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainSSLKeyStoreLocation())).toAbsolutePath().normalize().toString().replace("\\", "/");


        Map<String, Object> props = new HashMap<>();
        String hostname = Optional.ofNullable(environment.getProperty("HOSTNAME")).filter(Predicate.not(String::isBlank)).orElse(environment.getProperty("COMPUTERNAME", "Localhost"));
        props.put(CLIENT_ID_CONFIG, hostname + "_" + environment.getProperty("spring.application.name"));
        props.put(BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("KafkaMainBrokerHost", jmServiceDao.getKafkaMainBrokerHost()));

        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("specific.avro.reader", "true");

        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.jaas.config", """
                com.sun.security.auth.module.Krb5LoginModule required serviceName="kafka"
                useKeyTab=true storeKey=true keyTab="%s" principal="%s";
                """.formatted(keyTabFile, jmServiceDao.getKafkaMainPrincipal()));
        props.put("session.timeout.ms", "45000");
        props.put("schema.registry.url", jmServiceDao.getKafkaMainSchemaRegistryUrl());
        // параметр устанавливается не через props
        System.setProperty("java.security.krb5.conf", krb5ConfFile);

        props.put("security.protocol", "SASL_SSL");
        props.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1,TLSv1");
        props.put("ssl.truststore.location", environment.getProperty("KafkaMainSSLKeyStoreLocation" + "kafka.main.truststore.jks", jksFile));

        props.put("ssl.truststore.password", environment.getProperty("KafkaMainSSLKeyStorePwd", jmServiceDao.getKafkaMainSSLKeyStorePassword()));
        //log.debug("kafka props {}", props);
        //log.debug("system env {}", System.getProperties());
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplateForRetail() {
        return new KafkaTemplate<>(producerRetailFactory());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplateForMain() {
        return new KafkaTemplate<>(producerMainFactory());
    }
}

В этом классе необходимо обратить внимание (кроме обязательных параметров), на параметр sasl.jaas.config и здесь я столкнулся с первой проблемой, оказалось, что нигде нет проверки на правильность пути к keyTab файлу. Разработка велась в ОС Windows, и windows путь, например C:\ra_projects\Java\apache-tomcat-10.1.10\conf\keystore\tkafka.keytab из-за отсутствия экранирования слешей (путь хранится в БД), преобразуется во внутренностях класса связанного с авторизацией, в C:a_projectsJava�pache-tomcat-10.1.10confkeystore kafka.keytab, в Exception я получал только, что авторизация не удалась:

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user

И только в режиме Debug я увидел кривой путь к файлу keyTab. Добавил экранирование. Так же завел issue.

Вторая проблема, я рандомно получал ошибку:

javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will retry.

Оказалось, что мне предоставили неправильный файл krb5.conf, содержащий неправильные пути к KDC доменам. А в последствии, что распарсенный файл krb5.confхранится кэшированным, и даже если его содержимое поменять, он не будет перечитан до перезапуска Tomcat сервера, на который деплоилось мое приложение.

Сериализация данных

В конфигурационном классе KafkaConfig необходимо указать домен к хранилище схем:

props.put("schema.registry.url", jmServiceDao.getKafkaMainSchemaRegistryUrl());

Для отправки данных в Kafka был написан класс, который использует две конфигурации в зависимости от данных, которые были получены для отправки:

@Service
@Slf4j
public class JMServiceImpl implements JMService {
    private enum KafkaType {
        KAFKA_RETAIL,
        KAFKA_MAIN
    }

    private final JMServiceDao jmServiceDao;
    // авторизация через SSL, тут продюсер для отправки "сырых" данных
    private final KafkaTemplate<String, Object> kafkaTemplateForRetail;
    // авторизация через SSL,Kerberos, тут продюсер для отправки отправки сериализованных данных
    private final KafkaTemplate<String, Object> kafkaTemplateForMain;

    @Autowired
    public JMServiceImpl(JMServiceDao plcPackageDao,
                         @Qualifier("kafkaTemplateForRetail") KafkaTemplate<String, Object> kafkaTemplateForRetail,
                         @Qualifier("kafkaTemplateForMain") KafkaTemplate<String, Object> kafkaTemplateForMain) {
        this.jmServiceDao = plcPackageDao;
        this.kafkaTemplateForRetail = kafkaTemplateForRetail;
        this.kafkaTemplateForMain = kafkaTemplateForMain;
    }


    @Override
    public Mono<KafkaSendOKResponse> sendMessage(QueueMessageId queueMessageId) throws IOException {
        log.info("Попытка отправить сообщение queueMessageId = {}", queueMessageId.getQueueMessageId());
        QueueMessage message = jmServiceDao.getQueueMessage(queueMessageId.getQueueMessageId());
        if (message == null) {
            throw new ResponseStatusException(NOT_FOUND, "В теле запроса указан несуществующий идентификатор сообщения в очереди");
        } else {
            KafkaType messagingSystem = KafkaType.valueOf(message.getMessagingSystemCode());
            Object messageForSend;
            KafkaTemplate<String, Object> kafkaTemplate = switch (messagingSystem) {
                case KAFKA_MAIN -> {
                    ObjectMapper objectMapper = new ObjectMapper();
                    //строка с данными в JSON
                    JsonNode rootNode = objectMapper.readTree(message.getMessageData());
                    //нужна сериализация
                    SendPTI ptiMessage = new SendPTI();
                    ptiMessage.setSiebelId(rootNode.get("siebel_id").asText());
                    ptiMessage.setPti(ByteBuffer.wrap(rootNode.get("pti").asText().getBytes(StandardCharsets.US_ASCII)));
                    ptiMessage.setPtiDate(Instant.parse(rootNode.get("pti_date").asText()));
                    messageForSend = ptiMessage;
                    yield kafkaTemplateForMain;
                }
                case KAFKA_RETAIL -> {
                    //отправляем просто строку
                    messageForSend = message.getMessageData();
                    yield kafkaTemplateForRetail;
                }
                default -> throw new IllegalArgumentException("Неподдерживаемая система: " + messagingSystem);
            };

            log.debug("Отправляем в топик {} данные {}", message.getTopic(), messageForSend);
            return Mono.fromFuture(kafkaTemplate.send(message.getTopic(), messageForSend))
                    .map(result -> {
                        log.debug("Результат {} отправки сообщения {}", result, queueMessageId.getQueueMessageId());
                        LocalDateTime localDateTime = result.getRecordMetadata().hasTimestamp()
                                ? LocalDateTime.ofInstant(Instant.ofEpochMilli(result.getRecordMetadata().timestamp()), ZoneId.systemDefault())
                                : null;
                        return new KafkaSendOKResponse()
                                .setQueueMessageId(queueMessageId.getQueueMessageId())
                                .setSendStatus(1)
                                .setRecordMetadata(result.getRecordMetadata().toString())
                                .setSentDate(localDateTime);
                    })
                    .doOnError(ex -> {
                        log.error("Произошла ошибка {} при отправке сообщения {}", ex, queueMessageId.getQueueMessageId());
                    });
        }
    }
}

Выборка нужной конфигурации (бина) осуществляется с помощью аннотации @Qualifier. Для KAFKA_MAIN используется Avro сериализатор, для KAFKA_RETAIL отправляется строка.

Для генерации класса SendPTI для сериализации отправляемых\принимаемых данных используется Maven плагин:

<plugin>
	<groupId>org.apache.avro</groupId>
	<artifactId>avro-maven-plugin</artifactId>
	<version>1.11.3</version>
	<executions>
		<execution>
			<phase>generate-sources</phase>
			<goals>
				<goal>schema</goal>
			</goals>
			<configuration>
				<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
				<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
			</configuration>
		</execution>
	</executions>
</plugin>

В нем нужно указать путь к *.avcs файлу, содержащему исходную схему, мне был предоставлен такой:

{
  "type": "record",
  "name": "SendPTI",
  "namespace": "com.technology.kafkaproducer.jmessageservice.SendPTI",
  "fields": [
    {
      "name": "siebel_id",
      "type": "string"
    },
    {
      "name": "pti",
      "type": {
        "type": "bytes",
        "scale": 5,
        "precision": 7,
        "connect.version": 1,
        "connect.parameters": {
          "scale": "5",
          "connect.decimal.precision": "7"
        },
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "logicalType": "decimal"
      }
    },
    {
      "name": "pti_date",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    }
  ],
  "connect.name": "ru.COMPANY.schema.SCORF.SendPTI"
}

При сборке проекта будет запускаться плагин и будет сгенерирован POJO класс, в моем примере выше это класс SendPTI.

Когда все уже было готово, я столкнулся с другой проблемой; доступ в сеть у меня осуществляется через рабочий прокси, но при попытке отправить сообщение в Kafka, происходила безуспешная попытка подключения к Schema Registry и приложение падало с ошибкой:

2024-05-29 11:09:15.754 [http-nio-8081-exec-2] ERROR i.c.k.s.client.rest.RestService - Failed to send HTTP request to endpoint: https://schema-kafka-test/subjects/SCORF.SendPTI-value/versions/latest/ java.io.IOException: Unable to tunnel through proxy. Proxy returns "HTTP/1.1 407 Proxy Authentication Required"

Решением стало добавление дополнительных параметром при запуске моего локального (разработческого) сервера Томкат:

-Dhttps.proxyHost=proxy.host
-Dhttps.proxyPort=8080
-Dhttps.proxyUser=user
-Dhttps.proxyPassword=superSecretPassword

После этого я смог успешно отправлять данные из одного сервиса на два разных сервера Kafka с разными типами авторизации и с сериализацией и без.

Мира!

Комментарии (2)


  1. Khamsha
    11.06.2024 03:28

    А сколько времени ушло в итоге, чтобы все эти трудности преодолеть? Мы недавно возились с подобной задачей на питоне и докере, и потратили несколько вечеров троих инженеров


    1. foxyrus Автор
      11.06.2024 03:28

      Делал один, дня 4 чтобы подобрать все нужны параметры.