Есть простая, можно сказать, типовая – задача, передать данные из системы «А» в систему «Б». А – классическая legacy-трехзвенка из 00х с IIS-MSSQL, «Б» - новая-нулевая-микросервисная с внутренней шиной на apache kafka и собственным ETL на Apache NiFi, развернута в k8s. Направление передачи – из «А» в «Б», по расписанию , в общем ничего сложного – «Работенка на 5 минут»: идем в NiFi делаем QueryDatabaseTable->PublishKafkaRecord и продолжаем спать – но тут начинаются «Нюансы»(ТМ) в виде ИБ, которая говорит, что прямая интеграция корпоративных систем – харам, архитектуры которой (дикие люди!) не нравится хождение в чужую БД (Подержи моё пиво! Я сто раз так делал!) и прочих скучных регламентов, требующих «наличия аутентификации», «направления установления соединения совпадающего с направлением передачи» и тому подобных глупостей.

И вот тут на сцену выходит корпоративная интеграционная шина – (low|no)-code решение, которое умеет в расписания, подключение к ИС по различным протоколам (в том числе и *dbc), передачу данных с помощью REST\SOAP, аутентификацию, обработку ошибок, алертинг и кучу других вещей. Оооок, шина по расписанию будет ходить в БэДэ (Или не БэДэ – там уже видно будет), забирать данные и передавать… А куда, собственно, передавать?

Первый вариант – «в kafka’у!» хорош примерно всем – кроме реализации. Собственно, бинарный протокол kafka’и шина не умеет, ИБ не умеет в инспекцию этого самого протокола, ingress-nginx контроллер не умеет (Нормально – не умеет, ssl-passthrough в данном случае не очень-то «нормально») в публикацию kafka’и, а согласовывать с ИБ публикацию брокеров через LB – удачи, пацаны. Плюс нормальная аутентификация\авторизация на kafka’е – тот еще геморрой между нами говоря. Отметаем.

Вариант «Бэ!» - делаем отдельный интеграционный сервис на каком-нибудь fastapi – пятью минутами уже не обойдешься, а когда количество интеграций переползет за первый десяток процесс может стать ув-ле-ка-тель-ным. Нет, ничего принципиально невозможного – но in-scale не дешево.

Вариант «Цэ» - используем тот же NiFi. HandleHTTPRequest-PublishKafka-HandleHTTPResponse, можно и за три минуты управиться. Правда аутентификацию «из коробки» он умеет только по сертификатам, а тот же basic уже надо делать на бизнес-слое, а стандартный корпоративный OIDC и вовсе употеешь. Опять же – http-сервер из NiFi такой себе, HA нет (NiFi-cluster он про LB, а не HA\FT), валидацию данных делать не так, чтобы удобно. Можно конечно. Но – душа просит, просит чего-то…

И тут в голову приходит kafka-rest. А почему бы и не да? Рест-адаптер для шины есть, он уже готов и делать его не надо все плюшки «варианта А» без минусов реализации (Не, ну понятно, что своих минусов там будет – лопатой ешь, начиная от stateful consuming и необходимостью работы через sticky-sessions, заканчивая… «Об этом я подумаю завтра!»(с)). Можно сделать за полчаса «из палка-и-веревка», все как мы любим.  

Собственно деплой в 4 строчки (KAFKA_REST_BOOTSTRAP_SERVERS и KAFKA_REST_SCHEMA_REGISTRY_URL в переменных) сервис-ингресс… работает. Надо только аутентификацию с авторизацией прикрутить – и можно идти за кофе.  Смотрим руководство (REST Proxy Security | Confluent Documentation) – а оно только в basic умеет, авторизация – вообще внутри kafka’и, что – см. выше. Лад-нень-ко, скотч вроде еще не кончился – прикрутим к этому делу oauth2-proxy для аутентификации, а авторизацию запилим на уровне ингресса – сделаем отдельный путь до каждого топика и запретим методы, отличные от POST:

apiVersion: networking.k8s.io/v1 
kind: Ingress 
metadata:   
	name: custom-kafka-rest   
	namespace: <namespace>   
	annotations:     
	nginx.ingress.kubernetes.io/rewrite-target: /topics/custom-int-test     
	nginx.ingress.kubernetes.io/configuration-snippet: |       
		if ($request_method != POST) {         
		return 403;       
	} 
spec:   
	ingressClassName: nginx
	rules:
	- host: <host>
 http:
    paths:
      - path: /krest/topics/custom-int-test
       pathType: Exact
       backend:
    	    service:
    		   name: custom-kafka-oauth2
    		   port:
   			   name: http

С запретом методов косо-криво конечно, но работает. Дальше собственно oauth2-proxy – поскольку проверять мы будем по сути только наличие токена и интерактивная работа пользователей нас не интересует – то задача конфигурирования вот этого вот всего значительно упрощается.

Создаем в keycloak клиента, тип confidential, service account + авторизацию включаем, остальное можно убрать, создаем клиентскую роль добавляем её клиенту, проверяем – токен получается нормально. Ок. Идем с этим токеном в oauth2-proxy – не ок, keycloak-oidc ругается на отсутствующий audience, приходится делать еще и соответствующий mapper. Проверяем – вроде норм.

Делаем сайдкар к контейнеру с kafka-rest:

- --skip-jwt-bearer-tokens=true # Звучит страшно, переводится как «пропускаем всех с токеном»
- --insecure-oidc-allow-unverified-email=false
- --api-route=/topics/custom-int-test # вместо redirect’а по всем неавторизованным запросам к этому адресу отдаем 401
- --show-debug-on-error
- --redirect-url=https://<host>/krest/oauth/callback # нафиг не нужно в данном случае – но без него не работае
- --scope=openid
- --provider=keycloak-oidc # Нам нужен контроль доступа привязанный к ролям
- --oidc-issuer-url=https://<host>/auth/realms/<realm>
- --http-address=0.0.0.0:4180
- --allowed-role=custom-kafka-rest:Krest #Специфично для KC, для других провайдеров не заработает
- --proxy-prefix=/krest/oauth
- --upstream=http://127.0.0.1:8082/
- --email-domain=<домен>
- --ssl-insecure-skip-verify
- --set-basic-auth=false
- --client-id=custom-kafka-rest # Не нужно, но без этого не работает
- --client-secret='not used'    # Не нужно, но без этого не работает
- --cookie-secret='not used     # Не нужно, но без этого не работает

В процессе приходится подсовывать ненужное-ненужно в конфигурацию для удовлетворения «формальных требований», но в общем ничего сложного. На всякий случай - nginx.ingress.kubernetes.io/auth-signin и nginx.ingress.kubernetes.io/auth-url в аннотацию ингресса класть не нужно – «по условию задачи» мы просто проверяем предоставляемый токен, посылая всех на... 401 в случае его отсутствия.

В процессе выявляется не очень приятная штука – сделать «нормальную авторизацию» на одном oauth2-proxy похоже не получится. Указать несколько апстримов, разделив их путями – можно, указать несколько разрешенных ролей – тоже, а вот сделать так, чтобы в топик А мог писать владелец роли Х, а в топик Б – владелец роли У и никак иначе – видимо, нет. Конечно, всегда можно сделать для отдельных ингрессов отдельный сайдкар – но это уже «низкий класс, не чистая работа». Впрочем, пока – «пренебречь, вальсируем!» - аутентификацию с авторизацией мы, с грехом пополам, сделали – теперь надо родить контроль входных данных.

И тут на помощь приходит паркур schema-registry. В авро-фигавро и прочую бинарную сериализацию наша шина не умеет, а вот в json вполне себе. Сервисы, расположенные «за kafka’ой» ожидают получить что-то вроде:

{
    "pointName": "125",
    "events": [
        {
            "Value": 0.000000000000000e000,
            "timestamp": 1694537700000000000,
            "quality": 0,
            "annotatiton": None,
        },
        {
            "Value": 0.000000000000000e000,
            "timestamp": 1694537700000000000,
            "quality": 0,
            "annotatiton": None,
        },
    ]

Теоретически – типы значений могут быть разными (А conditional-схемы нуууу… эээ… всегда можно сделать несколько схем, правда?) – но у нас достаточно простой случай: что в БД есть, то и кладем – а тип данных в таблице прибит гвоздями.  Нарисуем под это дело jsonschema’у:

{
  "title": "JSONv3",
  "description": "Схема данных TSDS",
  "type": "object",
  "properties": {
    "pointName": {
      "type": "string"
    },
    "events": {
      "type": "array",
      "items": {
        "properties": {
          "annotation": {
            "type": "null"
          },
          "value": {
            "type": "number"
          },
          "timestamp": {
            "type": "number"
          },
          "quality": {
            "type": "number"
          }
        }
      }
    }
  }
}

И запихнем её в schemaregistry.

Дальше необходимо определиться с версией API kafka-rest – их собственно две, вторая и третья (Традиционно - луДшая). Увы, нам подходит именно вторая, т.к. третья работу со схемами не поддерживает. V2 ожидает получить данные в следующем (минимальном) формате: {"records":[{"value":{<"Что":"вам угодно">}}]}. Ну, собственно заворачиваем желаемую сервисом структуру в требуемую REST-proxy обертку и пробуем:

curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" --data '{"value_schema_id": <номер схемы>, "records":[{"value":{<данные>}}]}' "https://<host>/krest/topics/custom-int-test/" –k

#таки да:

<Response [200]> {"offsets":[{"partition":0,"offset":27,"error_code":null,"error":null},{"partition":0,"offset":28,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":<номер схемы>}

#А в случае, если мы эти данные чуть-чуть, самую капельку, малость немножко поломаем – таки нет:

<Response [422]> {"error_code":42203,"message":"Conversion of JSON to Object failed: Failed to convert JSON using JSON Schema: #/events/0/quality: expected type: Number, found: String"}

Успех? Хм… не совсем. А что будет, если вместо данных, не соответствующих схеме мы попробуем загрузить данные _без_ схемы? А ничего. Совсем. Нет схемы – нет валидации – нет проблем.

Бида. Бида-бида даже. Нет, теоретически в конфлюэнтовской реализации есть broker-side контроль схем – но он проблемы с отсутствующей схемой не решает. Хм. А как собственно мы понимаем, используется схема или нет? Документация говорит, что по заголовку Content-Type:

application/vnd.kafka[.embedded_format].[api_version]+[serialization_format]

application/vnd.kafka.jsonschema.v2+json – т.е. есть «jsonschema» в качестве embedded format – проверяем, обычный json – не проверяем. Ну? Поняли, да? Иех, сгорел сарай – гори и хата:

Дети, не делайте так! (Взрослые – скажите, как правильно?)

nginx.ingress.kubernetes.io/configuration-snippet: |   if ($request_method != POST) {     return 403;   }   if ($content_type != "application/vnd.kafka.jsonschema.v2+json") {           return 405;   }

фигак!

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[{"value":{<данные>}}]}' "https://<host>/krest/topics/custom-int-test/" –k

<Response [405]> <html>

<head><title>405 Not Allowed</title></head>

<body>

<center><h1>405 Not Allowed</h1></center>

<hr><center>nginx</center>

</body>

</html>

Тут пытливый читатель может спросить: а что будет, если схема есть и данные ей соответствуют – просто схема эта, гм, другая? На это хочется спросить: «Ты вообще за меня или за медведя?!» - а написать придется что-то вроде: «данная ситуация будет обрабатываться на сервисе обработчике данных, читающем содержимое топика ХХХ».

В общем, осталось только из таблички

create table dbo.DataHour
(
 Date        date not null, 
 Hour        int  not null, 
 ObjectId    int  not null, 
 LayerId     int  not null 
 constraint FK_DataHour_Layer 
 references dbo.Layer,
 ParameterId int  not null
 constraint FK_DataHour_Parameter
 references dbo.Parameter,
 Value       float, 
 ValueEnum   int, 
 SaveDate    smalldatetime, 
 constraint PK_DataHour 
 primary key (Date, Hour, ObjectId, LayerId, ParameterId) 
 with (fillfactor = 80) 
 )

Эту самую жысонину слабать. С учетом того, что этот самый (low|no)-code на шине – то еще чудушко – проще оказалось запихнуть создание json’а в хранимку

И так - тем более не надо!
create procedure getdata(@fr timestamp) 
as 
begin     
    declare
         @points   table
                (                     
                   pointName int not null                 
                )
    declare          
    	 @result  	table                 
    	 (                     
    	 	pointName varchar(10),
    	 	events    varchar(max)                 
	 )     
	 declare   @pointName  int     
	 insert into  @points
	 select distinct ObjectId
	 from dbo.DataHour     
	 where SaveDate > @fr
	 select @pointName = min(pointName)
	      from   @points
	         while @pointName is not null
	                  begin             
	                  	insert into   @pointName           
	                  	values (caste @pointName as varchar(10)), 
 (select 
 value                                                             
 as Value,
 DATEDIFF_BIG(NANOSECOND, '1970-01-01 00:00:00.0000000', SaveDate) as timestamp,
		 0                                                                 
		 as quality,                                                              				null                                                              
		 as annotatiton                                                      
		  from dbo.DataHour                                                       
		  where ObjectId = @pointName                                                      
		  and SaveDate > @fr                                                       
		  for json auto, include_null_values))             
		  select @pointName = min(pointName)             
		  from @points            
		  where pointName >  @pointName       
		  end select JSON_MODIFY((select pointName as [value.pointName], 
		  JSON_QUERY(events) as [value.events] 
		  from  @result  
		  for json path, ROOT('records')), 'append $.value_schema_id', <номер схемы>) 
		  end

Цена вопроса со всем research’ем и приседушками – 4 часа, на описание\согласование решения больше уйдет. Рекомендую ли я это решение к использованию? В том виде, в котором оно описано – скорее «нет», костылей все же многовато – но докрутить до вменяемого состояния можно.

Комментарии (2)


  1. Haverlon
    14.09.2023 08:17

    |Собственно, бинарный протокол kafka’и шина не умеет

    Это простите что за шина такая ?)))


    1. atshaman Автор
      14.09.2023 08:17

      Отечественная. Импортозамещенная. Не-у-ме-ет.

      Возможности платформы DATAREON по интеграции приложений

      Не вляпайтесь ненароком :)