Для чего это нужно

Конвертация сообщений между брокером и REST упрощает прием и отправку сообщений без использования нативных протоколов или клиентский приложений брокеров сообщений:

Возможные варианты использования:

  • Асинхронное взаимодействие между сервисами. Конвертация REST запросов в сообщения брокера способствует ослаблению связи между сервисами, способствует увеличению производительности и устойчивости к ошибкам

  • Сбор логов. Мобильные приложения могут отправлять логи своей работы через REST в брокер сообщений.

  • Согласование протоколов. Не все приложения имеют возможность взаимодействия через брокеры сообщений, для их интеграции используется конвертация REST-брокер сообщений.

  • Пересечение сегментов. Сегменты предприятия, как правило, разделены и взаимодействуют между собой, используя брокер сообщений.

В статье мы настроим шлюз с открытым исходным кодом OpenIG для конвертации сообщений брокера в REST и обратно.

Подготовка к работе

Предположим, у вас уже установлен и настроен OpenIG. Если же нет, то как быстро это сделать, описано в статье How To Protect Web Services with OpenIG.

Вы можете так же использовать демонстрационный проект https://github.com/maximthomas/openig-mb-example как стартовую точку

Варианты использования

Отправка HTTP запросов в Apache Kafka

Настройка позволяет получать сообщения по HTTP протоколу и отправлять их в Apache Kafka.

Добавьте в файл конфигурации OpenIG config.json в обработчик Kafka producer:

{
  "heap": [
    ...
    {
      "name": "kafka-producer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "kafka:9092",
        "topic.produce": "incoming-messages"
      }
    },
    ...
  ]
} 

Важные настройки обработчика:

Настройка

Описание

boostrap.server

Список хотсто и портов Apache Kafka, указанные через запятую

topic.produce

Топик, в который OpenIG отправляет сообщения

topic.consume

Топик, из которого OpenIG читает сообщения

uri

Конечная точка маршрута OpenIG

method

Метод HTTP, который использует OpenIG для отправки запросов по HTTP

Добавьте маршрут OpenIG, который получать HTTP запросы и отправлять сообщения в Apache Kafka:

routes/10-http2kafka.json:

{
  "name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}",
  "condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
    "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "kafka-consumer"
            }
          ]
        }
      }
    }
  }

Примеры файлов конфигурации находятся в проекте в директории openig/config

Запустите Docker контейнеры командой

docker compose -f docker-compose.yml up

Создайте топик для Apache Kafka. Пример команды для Docker контейнера:

docker exec openig-mb-example-kafka-1 kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092

Отправьте HTTP запрос в OpenIG и проверьте сообщения в созданном топике:

curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' '<http://localhost:8080/http2kafka>'
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /http2kafka HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 16
> 
* upload completely sent off: 16 out of 16 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Server: Apache-Coyote/1.1
< Content-Length: 0
< Date: Wed, 13 Apr 2022 12:34:03 GMT
< 
* Connection #0 to host localhost left intact
docker exec openig-mb-example-kafka-1 kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server localhost:9092
{"data": "test"}

Отправка сообщений Kafka в HTTP

В следующей конфигурации OpenIG будет получать сообщения и топика topic2 Apache Kafka и отправлять их на конечную точку HTTP.

Потушите Docker контейнеры командой

docker compose -f docker-compose.yml down

Добавьте в файл конфигурации обработчик Kafka producer.

config.json

{
  "heap": [
    ...
    {
      "name": "kafka-consumer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "kafka:9092",
        "topic.consume": "topic2",
        "method": "POST"
      }
    },
    ...
  ]
}

Добавьте в OpenIG маршрут, который будет слушать сообщения из Apache Kafka и перенаправлять их на конечную точку HTTP.

routes/10-kafka2http.json

{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
      "type": "DispatchHandler",
        "config": {
          "bindings": [{
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
          }]
        }
      }
    }
  }
}

Обратите внимание на свойство baseURI . В нем указан URI конечной точки HTTP. Значение берется из системного свойства. указанного в файле docker-compose.yaml -Dendpoint.api=http://sample-service:8080 для сервиса OpenIG

Добавьте в Apache Kafka топик topic2, из которого OpenIG будет читать сообщения и перенаправлять их на конечную точку HTTP.

docker exec openig-mb-example-kafka-1 kafka-topics.sh --create --topic topic2 --bootstrap-server localhost:9092

Отправим тестовые данные в созданный топик:

docker exec -it openig-mb-example-kafka-1 kafka-console-producer.sh --topic topic2 --bootstrap-server localhost:9092
>{"data": "test"}

В логе сервиса sample-service на конечную точку которого OpenIG перенаправляет сообщения появится запись:

2024-07-05T08:46:37.540Z DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /kafka2http, headers=[correlation-id:"8dd45456-433d-42cb-b992-27047ae75ed9", kafka-offset:"0", kafka-timestamp:"1720169196044", kafka-timestamp-date:"Fri Jul 05 08:46:36 UTC 2024", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/17.0.9)"], payload={"data": "test"}]

Настройка встроенного в OpenIG Apache Kafka

Если в инфраструктуре предприятия нет брокера сообщений, но есть потребность получать и перенаправлять сообщения брокера, то OpenIG предлагает встроенный брокер сообщений. Для использования встроенного Apache Kafka, добавьте в файл конфигурации OpenIG объект EmbeddedKafka

config.json

{
  "heap": [
    ...
      {
        "name": "EmbeddedKafka",
        "type": "EmbeddedKafka",
        "config": {
          "zookeper.port": "${system['zookeper.port']}",
          "security.inter.broker.protocol": "${empty system['keystore.location'] ?'PLAINTEXT':'SSL'}",
          "listeners": "${system['kafka.bootstrap']}",
          "advertised.listeners": "${system['kafka.bootstrap']}",
          "ssl.endpoint.identification.algorithm": "",
          "ssl.enabled.protocols":"TLSv1.2",
          "ssl.keystore.location":"${system['keystore.location']}",
          "ssl.keystore.password":"${empty system['keystore.password']?'changeit':system['keystore.password']}",
          "ssl.key.password":"${empty system['key.password']?'changeit':system['key.password']}",
          "ssl.truststore.location":"${system['truststore.location']}",
          "ssl.truststore.password":"${empty system['truststore.password']?'changeit':system['truststore.password']}"			
        },
    ...
  ]
}

Важные настройки EmbeddedKafka:

Настройка

Описание

zookeper.port

Порт Zookeper для встроенного Apache Kafka. Если не установлен, Kafra не запустится

listeners

Имена хостов и порты, которые будет слушать встроенный Apache Kafka.

advertised.listeners

Имена хостов и порты клиентов встроенного Apache Kafka.

Добавьте Kafka listener в массив heap OpenIG и создайте маршрут, который будет слушать сообщения Kafka и перенаправлять их на конечную точку HTTP (вы можете так же перенаправлять сообщения на другой брокер).

config.json

{
  "heap": [
    ...
      {
      "name": "kafka-consumer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "openig:9092",
        "topic.consume": "topic1",
        "method": "POST",
        "uri": "/kafka2http"
      }
    ...
  ]
}

10-kafka2http.json

{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
      "type": "DispatchHandler",
        "config": {
          "bindings": [{
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
          }]
        }
      }
    }
  }

Запустите OpenIG. Теперь вы можете создать topic и отправлять сообщения в этот topic.

$ kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092
>{"data": "test"}

В тестовом сервисе в логе появится сообщение, перенаправленное OpenIG из брокера на конечную точку HTTP.

2022-04-21 07:26:14.645 DEBUG 1 --- [nio-8080-exec-6] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /kafka2http, headers=[kafka-offset:"29", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]

Интеграция с IBM MQ

Отправка HTTP запросов в IBM MQ

Следующая настройка позволяет получать сообщения по HTTP протоколу и отправлять их в topic IBM MQ:

Добавьте обработчик IBM MQ Consumer в heap в файл конфигурации OpenIG:

config.json

{
  "heap": [
    ...
    {
      "name": "mq-producer",
      "type": "MQ_IBM",
      "config": {
        "XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)",
        "XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN",
        "XMSC_WMQ_QUEUE_MANAGER":"QM1",
        "XMSC_USERID":"app",
        "XMSC_PASSWORD":"passw0rd",
        "topic.produce": "DEV.QUEUE.1"
      }
    },
    ...
  ]
}

Важные настройки IBM MQ:

Setting

Name

XMSC_WMQ_CONNECTION_NAME_LIST

Адреса брокеров IBM MQ в формате списка именов хостов и портов, указанные через запятую

XMSC_WMQ_CHANNEL

Имя канала IBM MQ, используется для соединения

XMSC_USERID

Имя пользователя IBM MQ

XMSC_PASSWORD

Пароль пользователя IBM MQ

topic.produce

Топик, в который OpenIG должен слать сообщения

topic.consume

Топик, из кторого OpenIG читает сообщения

uri

Конечная точка OpenIG

method

Метод HTTP, который OpenIG использует для отправки запросов на конечную точку HTTP

Добавьте маршрут OpenIG в папку routes для обработки HTTP запросов.

10-http2mq.json

{
  "name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}",
  "condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "mq-producer"
            }
          ]
        }
      }
    }
  }
}

Отправьте HTTP запрос в OpenIG и проверьте полученное сообщение в топике DEV.QUEUE.1 IBM MQ:

$ curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' '<http://localhost:8080/http2mq>'
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /http2mq HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 16
> 
* upload completely sent off: 16 out of 16 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Server: Apache-Coyote/1.1
< Content-Length: 0
< Date: Wed, 13 Apr 2022 12:34:03 GMT
< 
* Connection #0 to host localhost left intact

Откройте консоль IBM MQ по адресу  https://localhost:9443/ibmmq/console/. В топике DEV.QUEUE.1 вы увидите полученное сообщение:

Отправка сообщений IBM MQ на конечную точку HTTP

Добавьте IBM MQ cosumer в heap в файл конфигурации OpenIG config.json.

{
  "heap": [
    ...
    {
      "name": "mq-consumer",
      "type": "MQ_IBM",
      "config": {
        "XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)",
        "XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN",
        "XMSC_WMQ_QUEUE_MANAGER":"QM1",
        "XMSC_USERID":"app",
        "XMSC_PASSWORD":"passw0rd",
        "topic.consume": "DEV.QUEUE.2",
        "uri": "/mq2http",
        "method": "POST"
      }
    }
    ...
  ]
}

Добавьте маршрут OpenIG в папку routes для обработки сообщений IBM MQ:

10-mq2http.json

{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
            }
          ]
        }
      }
    }
  }
}

Зайдите в консоль IBM MQ и отправьте сообщение в топик DEV.QUEUE.2

В логе сервиса sample-servive вы увидите следующее сообщение:

2022-04-21 08:32:35.007 DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /mq2http, headers=[jms_ibm_character_set:"UTF-8", jms_ibm_encoding:"273", jms_ibm_format:"MQSTR", jms_ibm_msgtype:"8", jms_ibm_putappltype:"6", jms_ibm_putdate:"20220421", jms_ibm_puttime:"08323434", jmsxappid:"com.ibm.mq.webconsole", jmsxdeliverycount:"1", jmsxuserid:"unknown", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]

Комментарии (0)