Как известно, Apache Kafka позволяет позволяет подключаться к кластеру по нескольким протоколам. Можно работать без авторизации, используя протокол PLAINTEXT. Поддерживаются также протоколы с безопасностью (SASL_PLAINTEXT, SASL_SSL, SSL), но их использование требует непростой настройки и понимания нескольких смежных технологий (как минимум, SASL и JAAS). В данной статье я расскажу, что PLAINTEXT не так прост и небезопасен, как кажется, и что в нём можно использовать почти всю мощь кафкианского ACL — дёшево и сердито.

Постановка задачи

Наша цель проста и понятна. Есть kafka-кластер, есть приложения, которые к нему подключаются, хочется настроить в этой конструкции безопасности и при этом не переутомиться. Одним приложениям можно одно и нельзя другое, а другим наоборот. Есть брокеры, которым можно в наш кластер вступить, есть клиенты, которые хотят что-то в кластере сделать.

Хочется настроить правила безопасности, при этом не запутаться в настройках и не разворачивать рядом дополнительных решений. Достичь простую цель простыми средствами. Может быть когда-нибудь потом замутить что-то более хитрое с помощью JAAS, но не сейчас.

Для достижения поставленной цели придётся немного попрограммировать и разобраться в коде Kafka. На всякий случай уточню (это не принципиально, данная механика существует давно, но какие-то нюансы могут меняться в версиях), что я использую собранную из исходных кодов версию на основе https://github.com/apache/kafka/tree/trunk , она сейчас обозначается как 4.1.0-SNAPSHOT. В связи с этим напомню о двух своих предыдущих статьях:

Первый взгляд, или Немного теории

Про Apache Kafka написано много книг, пересказывать их не буду, остановлюсь на главном. Кластер может быть утроен одним из двух способов. Старый, на основе Apache ZooKeeper устарел, мы его рассматривать не будем (тем более, что он требует установки и настройки дополнительного ПО), с версии 4.0 его уже совсем нет, выпилили из кода. Новый, на основе реализации алгоритма Raft (KRaft) является целевым, ничего дополнительно не требует, и потому нам он подходит. В топологии есть узлы двух типов, брокеры и контроллеры, и тем, какие роли выполняет данный сервер, можно управлять. Роли можно совмещать. У меня простая узловая конфигурация (для целей данной демонстрации это не существенно), и потому в конфигурационном файле моего сервера стоит просто

process.roles=broker,controller

Далее, сервер слушает входящий трафик, и это задаётся вот такими настройками:

listeners=PLAINTEXT://:9092,CONTROLLER://:9093,
                            SASL_PLAINTEXT://:9094 controller.listener.names=CONTROLLER

Это значит, что по порту 9092 у меня брокер получает данные по протоколу PLAINTEXT, на порту 9094 у меня SASL_PLAINTEXT (но это нам сейчас не интересно), а порт 9093 занимает контроллер. Есть и другие настройки топологии, они важны, но о них мы тут говорить не будем. Как было выше сказано, есть ещё протоколы SASL_SSL и SSL. Это означает, что с одним и тем же сервером на разных портах можно общаться с использованием различных аутентификационных механизмов.

Слушает порты kafka с помощью каналов, и о каналах на основе протокола PLAINTEXT в документации сказано так: "Un-authenticated, non-encrypted channel". Это обещает очень не многое в плане аутентификации... Но нам хочется именно PLAINTEXT (может потому, что мы считаем оверхед других протоколов неприемлемым, а может просто так), в утверждения о небезопасности не верим, и потому продолжаем наши изыскания.

Второй взгляд, или Начинаем копать

Kafka предоставляет множество настроек механизма авторизации, но почти все они к PLAINTEXT отношения не имеют. Но есть одна, которая сразу же кажется перспективной, authorizer.class.name. Она нам намекает, что есть возможность кастомизации процесса путём разработки собственного обработчика процесса авторизации. В ней надо указать имплементацию интерфейса org.apache.kafka.server.authorizer.Authorizer. Для лучшего понимания, что он делает, я приведу перевод из javadoc:

  • Брокер создаёт экземпляр авторизатора, если установлено значение в переменной 'authorizer.class.name';

  • Брокер настраивает и запускает экземпляр авторизатора. Авторизатор начинает загружать свои метаданные;

  • Брокер запускает SocketServer чтобы принимать входящие соединения и обрабатывать запросы;

  • Для каждого слушателя (listener'а) SocketServer ожидает завершения загрузки метаданных прежде чем начать принимать соединения;

  • Брокер принимает соединения. Для каждого соединения он осуществляет аутентификацию, после чего принимает kafka-запросы. для каждого запроса брокер вызывает метод #authorize(AuthorizableRequestContext, List)} авторизатора чтобы авторизовать запрошенные действия.

Схема процесса авторизации по ACL
Схема процесса авторизации по ACL

В стандартной поставке есть только одна рабочая (не тестовая) имплементация авторизатора, org.apache.kafka.metadata.authorizer.StandardAuthorizer. Из приведённого описания вытекает, у StandardAuthorizer достаточно хитрая модель состояния (как минимум, ему надо инициализироваться), но это мы сейчас пропустим. Таким образом, инициализировать авторизатор можно так:

authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
early.start.listeners=PLAINTEXT,CONTROLLER
allow.everyone.if.no.acl.found={true|false}

То есть наши слушатели готовы работать сразу, завершения инициализации не ждём, дефолтное поведение, если подходящей записи ACL не нашлось, как-то задаём. StandardAuthorizer работает с данными ACL, которые можно задать через командный клиент kafka-acl.(sh|bat). Он хорошо описан в документации, о нём подробно писать не буду. Единственное, что нужно понимать, что есть 1) ресурсы, 2) действия над ними, которые 3) разрешаются или запрещаются 4) принципалам. Воспользоваться kafka-acl.(sh|bat) можно только при указанном авторизаторе.

В общем, для протокола PLAINTEXT задать авторизатор можно, и можно настроить правила доступа к ресурсу. Но как клиент, подключающийся к моему брокеру по протоколу 9092, укажет как его зовут (принципала)? А никак. Это технически не предусмотрено. Более точно, при установлении соединения, полагающийся каналу с PLAINTEXT PrincipalBuilder конструирует только анонимного принципала, "User:ANONYMOUS" (строковое представление принципала состоит из типа принципала и его имени, оба произвольные строки). Соответственно, в таком канале все подключения с точки зрения аутентификации не различимы.

Но зацепиться есть за что, так как подключаясь к серверам кафки мы, на самом деле, сообщаем о себе не так уж и мало. И никто не может нам запретить написать свой собственный авторизатор, и потому мы копаем дальше!

Третий взгляд, или Копаем дальше

Итак, принципала (имя пользователя) мы из клиента для подключения передать не можем, это факт. Но что-то же можем? Да, можем. В конфигурационном файле kafka-клиента мы можем идентификатор клиента, то есть сообщить брокеру, что наше приложение зовут, например, client.id=my.super.client (произвольная строка). У брокеров в файле meta.properties тоже есть идентификатор, broker.id=1 (только целое число).

Осознав этот факт, мы можем реализовать наследника стандартного авторизатора, например так (см. код в https://github.com/kvmorozov/kafka-utils/tree/main/kafka-lib):

package org.apache.kafka.metadata.authorizer;

import com.yammer.metrics.core.Counter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import java.util.Collections;
import java.util.List;

import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;

public class NonStandardAuthorizer extends StandardAuthorizer {

    private static String PRINCIPAL_TYPE_BROKER = "Broker";
    private static String PRINCIPAL_TYPE_CLIENT = "Client";

    private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(NonStandardAuthorizer.class);
    private final Counter authCount = KafkaYammerMetrics.defaultRegistry().newCounter(metricsGroup.metricName("AuthCount", Collections.emptyMap()));

    private final ResourcePattern DEFAULT_RESOURCE_PATTERN = new ResourcePattern(TOPIC, "name", LITERAL);

    @Override
    public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
        authCount.inc();

        if (actions.isEmpty())
            if (requestContext instanceof RequestContext rc) {
                return super.authorize(rebuildRqWithPricipalType(rc, PRINCIPAL_TYPE_CLIENT), List.of(new Action(AclOperation.DESCRIBE,
                        DEFAULT_RESOURCE_PATTERN, 0, true, true)));
            } else
                return List.of(defaultResult());
        else {
            if (requestContext instanceof RequestContext rc) {
                String principalType = switch (actions.getFirst().operation()) {
                    case CLUSTER_ACTION, DESCRIBE_CONFIGS, IDEMPOTENT_WRITE -> PRINCIPAL_TYPE_BROKER;
                    default -> PRINCIPAL_TYPE_CLIENT;
                };

                return super.authorize(rebuildRqWithPricipalType(rc, principalType), actions);
            } else
                return super.authorize(requestContext, actions);
        }
    }

    private RequestContext rebuildRqWithPricipalType(RequestContext rc, String principalType) {
        return new RequestContext(rc.header, rc.connectionId, rc.clientAddress,
                new KafkaPrincipal(principalType, rc.header.clientId()),
                rc.listenerName, rc.securityProtocol, rc.clientInformation, false);
    }
}

Отмечу ключевые моменты:

  • Код размещаем в пакете org.apache.kafka.metadata.authorizer — это не принципиально, но даёт возможность вызова методов со скоупом package (здесь это defaultResult()).

  • Определяем 2 типа принципалов (можно считать это, примерно, неймспейсами), для брокеров и для клиентов.

  • За основу берём реализацию StandardAuthorizer, переопределяем только метод authorize, подменяя стандартный принципал (напомню, анонимный), кастомный, сконструированный из нового (это не обязательно) типа принципала "Client" и извлечённого из заголовка идентификатора клиента. Для нетипичных и пограничных случаев мы сохраняем стандартное поведение.

  • В некоторых ситуациях аутентификационный контекст теряется, список запрашиваемых действий приходит пустым. Это бывает, когда скоуп вызова не содержит явного указания на ресурс. Например, в некоторых случаях возможно указание пустого списка топиков. Этот случай надо предусмотреть.

  • И я добавляю JMX метрику, потому что создатели кафки об этом не позаботились. Это простой счётчик, который можно просмотреть как угодно или вывести в Grafana. Тут тоже есть некоторый нюанс и элемент хака, так как в kafka, как оказалось, не предусмотрены метрики типа counter. Но это тема для отдельного разговора.

  • Отличить клиента от брокера вполне можно по имени listener'а: понятно, что брокер будет подключаться к CONTROLLER, а клиенты к PLAINTEXT. Но они не обязаны называться именно так, поэтому здесь я для простоты смотрю только на операцию.

Таким образом, представленная реализация извлекает из заголовков контекста вызова информацию об идентификаторе клиента и строит принципал, который далее уже стандартным образом проверяется по настроенным ACL.

Для полноты картины отмечу, что можно реализовать также свой PrincipalBuilder, задав его в principal.builder.class, но, в отличие от авторизатора, в нём нам доступен только контекст аутентификации, который для PLAINTEXT ничего интересного не содержит (название протокола, адрес клиента и имя слушателя). Но, в принципе, можно в каких-то случаях рассмотреть и такую возможность.

Сам класс, конечно, надо собрать в jar и каким-то образом разместить в classpath kafka.

Последние штрихи

Чтобы вся конструкция завелась, нам надо:

  • запустить кластер, то есть брокеры должны подключиться к контроллерам;

  • добавить записи в ACL — как было сказано выше, добавить ACL на кластере с выключенной авторизацией нельзя (это логично, так как авторизатор отвечает и за жизненный цикл данных ACL).

Поэтому нам придётся пойти на небольшую уловку и реализовать вспомогательный авторизатор, чтобы придать первоначальный импульс. Он совершенно простой:

package org.apache.kafka.metadata.authorizer;

import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;

import java.util.List;

public class BootstrapAuthorizer extends StandardAuthorizer {


    @Override
    public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {

        return List.of(AuthorizationResult.ALLOWED);
    }
}

Комментировать тут нечего. Таким образом, для первоначальной настройки мы запускаемся на BootstrapAuthorizer, и даём такие права:

  • --add --allow-principal Broker:1 --operation ClusterAction --cluster

  • --add --allow-principal Client:my.super.client --operation Alter --cluster

Клиент с идентификатором my.super.client это тот клиент, под которым в дальнейшем будут осуществляться вызовы утилиты kafka-acl. После выполнения данной первоначальной настройки можно переключаться на NonStandardAuthorizer и дальнейшую настройку прав выполнять уже из под него.

Выводы

Сценарий использования данной технологии придумать не сложно. Если наше бизнес-приложение работает в контролируемом окружении, мы можем подготовить для его kafka-клиентов простой конфигурационный файл, содержащий подходящее значение client.id, для которого настроены необходимые этому приложению права.

Мы можем организовать и более сложную схему, когда client.id может менять со временем и иметь смысл токена, получаемого клиентом и авторизатором из единого источника. Это уже более извращённая идея, поскольку штатный способ аутентификации по токену реализован для протокола SASL_PLAINTEXT, о чём, возможно, будет рассказано в следующей статье.

Комментарии (0)