Для чего это нужно
Конвертация сообщений между брокером и REST упрощает прием и отправку сообщений без использования нативных протоколов или клиентский приложений брокеров сообщений:
Возможные варианты использования:
- Асинхронное взаимодействие между сервисами. Конвертация REST запросов в сообщения брокера способствует ослаблению связи между сервисами, способствует увеличению производительности и устойчивости к ошибкам 
- Сбор логов. Мобильные приложения могут отправлять логи своей работы через REST в брокер сообщений. 
- Согласование протоколов. Не все приложения имеют возможность взаимодействия через брокеры сообщений, для их интеграции используется конвертация REST-брокер сообщений. 
- Пересечение сегментов. Сегменты предприятия, как правило, разделены и взаимодействуют между собой, используя брокер сообщений. 
В статье мы настроим шлюз с открытым исходным кодом OpenIG для конвертации сообщений брокера в REST и обратно.
Подготовка к работе
Предположим, у вас уже установлен и настроен OpenIG. Если же нет, то как быстро это сделать, описано в статье How To Protect Web Services with OpenIG.
Вы можете так же использовать демонстрационный проект https://github.com/maximthomas/openig-mb-example как стартовую точку
Варианты использования
Отправка HTTP запросов в Apache Kafka
Настройка позволяет получать сообщения по HTTP протоколу и отправлять их в Apache Kafka.
Добавьте в файл конфигурации OpenIG config.json в обработчик Kafka producer:
{
  "heap": [
    ...
    {
      "name": "kafka-producer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "kafka:9092",
        "topic.produce": "incoming-messages"
      }
    },
    ...
  ]
} 
Важные настройки обработчика:
| Настройка | Описание | 
|---|---|
| boostrap.server | Список хотсто и портов Apache Kafka, указанные через запятую | 
| topic.produce | Топик, в который OpenIG отправляет сообщения | 
| topic.consume | Топик, из которого OpenIG читает сообщения | 
| uri | Конечная точка маршрута OpenIG | 
| method | Метод HTTP, который использует OpenIG для отправки запросов по HTTP | 
Добавьте маршрут OpenIG, который получать HTTP запросы и отправлять сообщения в Apache Kafka:
routes/10-http2kafka.json:
{
  "name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}",
  "condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
    "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "kafka-consumer"
            }
          ]
        }
      }
    }
  }
Примеры файлов конфигурации находятся в проекте в директории openig/config
Запустите Docker контейнеры командой
docker compose -f docker-compose.yml up
Создайте топик для Apache Kafka. Пример команды для Docker контейнера:
docker exec openig-mb-example-kafka-1 kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092
Отправьте HTTP запрос в OpenIG и проверьте сообщения в созданном топике:
curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' '<http://localhost:8080/http2kafka>'
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /http2kafka HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 16
> 
* upload completely sent off: 16 out of 16 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Server: Apache-Coyote/1.1
< Content-Length: 0
< Date: Wed, 13 Apr 2022 12:34:03 GMT
< 
* Connection #0 to host localhost left intact
docker exec openig-mb-example-kafka-1 kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server localhost:9092
{"data": "test"}
Отправка сообщений Kafka в HTTP
В следующей конфигурации OpenIG будет получать сообщения и топика topic2 Apache Kafka и отправлять их на конечную точку HTTP.
Потушите Docker контейнеры командой
docker compose -f docker-compose.yml down
Добавьте в файл конфигурации обработчик Kafka producer.
config.json
{
  "heap": [
    ...
    {
      "name": "kafka-consumer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "kafka:9092",
        "topic.consume": "topic2",
        "method": "POST"
      }
    },
    ...
  ]
}
Добавьте в OpenIG маршрут, который будет слушать сообщения из Apache Kafka и перенаправлять их на конечную точку HTTP.
routes/10-kafka2http.json
{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
      "type": "DispatchHandler",
        "config": {
          "bindings": [{
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
          }]
        }
      }
    }
  }
}
Обратите внимание на свойство baseURI . В нем указан URI конечной точки HTTP. Значение берется из системного свойства. указанного в файле docker-compose.yaml -Dendpoint.api=http://sample-service:8080 для сервиса OpenIG
Добавьте в Apache Kafka топик topic2, из которого OpenIG будет читать сообщения и перенаправлять их на конечную точку HTTP.
docker exec openig-mb-example-kafka-1 kafka-topics.sh --create --topic topic2 --bootstrap-server localhost:9092
Отправим тестовые данные в созданный топик:
docker exec -it openig-mb-example-kafka-1 kafka-console-producer.sh --topic topic2 --bootstrap-server localhost:9092
>{"data": "test"}
В логе сервиса sample-service  на конечную точку которого OpenIG перенаправляет сообщения появится запись:
2024-07-05T08:46:37.540Z DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /kafka2http, headers=[correlation-id:"8dd45456-433d-42cb-b992-27047ae75ed9", kafka-offset:"0", kafka-timestamp:"1720169196044", kafka-timestamp-date:"Fri Jul 05 08:46:36 UTC 2024", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/17.0.9)"], payload={"data": "test"}]
Настройка встроенного в OpenIG Apache Kafka
Если в инфраструктуре предприятия нет брокера сообщений, но есть потребность получать и перенаправлять сообщения брокера, то OpenIG предлагает встроенный брокер сообщений. Для использования встроенного Apache Kafka, добавьте в файл конфигурации OpenIG объект EmbeddedKafka
config.json
{
  "heap": [
    ...
      {
        "name": "EmbeddedKafka",
        "type": "EmbeddedKafka",
        "config": {
          "zookeper.port": "${system['zookeper.port']}",
          "security.inter.broker.protocol": "${empty system['keystore.location'] ?'PLAINTEXT':'SSL'}",
          "listeners": "${system['kafka.bootstrap']}",
          "advertised.listeners": "${system['kafka.bootstrap']}",
          "ssl.endpoint.identification.algorithm": "",
          "ssl.enabled.protocols":"TLSv1.2",
          "ssl.keystore.location":"${system['keystore.location']}",
          "ssl.keystore.password":"${empty system['keystore.password']?'changeit':system['keystore.password']}",
          "ssl.key.password":"${empty system['key.password']?'changeit':system['key.password']}",
          "ssl.truststore.location":"${system['truststore.location']}",
          "ssl.truststore.password":"${empty system['truststore.password']?'changeit':system['truststore.password']}"			
        },
    ...
  ]
}
Важные настройки EmbeddedKafka:
| Настройка | Описание | 
|---|---|
| zookeper.port | Порт Zookeper для встроенного Apache Kafka. Если не установлен, Kafra не запустится | 
| listeners | Имена хостов и порты, которые будет слушать встроенный Apache Kafka. | 
| advertised.listeners | Имена хостов и порты клиентов встроенного Apache Kafka. | 
Добавьте Kafka listener в массив heap OpenIG и создайте маршрут, который будет слушать сообщения Kafka и перенаправлять их на конечную точку HTTP (вы можете так же перенаправлять сообщения на другой брокер).
config.json
{
  "heap": [
    ...
      {
      "name": "kafka-consumer",
      "type": "MQ_Kafka",
      "config": {
        "bootstrap.servers": "openig:9092",
        "topic.consume": "topic1",
        "method": "POST",
        "uri": "/kafka2http"
      }
    ...
  ]
}
10-kafka2http.json
{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
      "type": "DispatchHandler",
        "config": {
          "bindings": [{
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
          }]
        }
      }
    }
  }
Запустите OpenIG. Теперь вы можете создать topic и отправлять сообщения в этот topic.
$ kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092
>{"data": "test"}
В тестовом сервисе в логе появится сообщение, перенаправленное OpenIG из брокера на конечную точку HTTP.
2022-04-21 07:26:14.645 DEBUG 1 --- [nio-8080-exec-6] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /kafka2http, headers=[kafka-offset:"29", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]
Интеграция с IBM MQ
Отправка HTTP запросов в IBM MQ
Следующая настройка позволяет получать сообщения по HTTP протоколу и отправлять их в topic IBM MQ:
Добавьте обработчик IBM MQ Consumer в heap в файл конфигурации OpenIG:
config.json
{
  "heap": [
    ...
    {
      "name": "mq-producer",
      "type": "MQ_IBM",
      "config": {
        "XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)",
        "XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN",
        "XMSC_WMQ_QUEUE_MANAGER":"QM1",
        "XMSC_USERID":"app",
        "XMSC_PASSWORD":"passw0rd",
        "topic.produce": "DEV.QUEUE.1"
      }
    },
    ...
  ]
}
Важные настройки IBM MQ:
| Setting | Name | 
|---|---|
| XMSC_WMQ_CONNECTION_NAME_LIST | Адреса брокеров IBM MQ в формате списка именов хостов и портов, указанные через запятую | 
| XMSC_WMQ_CHANNEL | Имя канала IBM MQ, используется для соединения | 
| XMSC_USERID | Имя пользователя IBM MQ | 
| XMSC_PASSWORD | Пароль пользователя IBM MQ | 
| topic.produce | Топик, в который OpenIG должен слать сообщения | 
| topic.consume | Топик, из кторого OpenIG читает сообщения | 
| uri | Конечная точка OpenIG | 
| method | Метод HTTP, который OpenIG использует для отправки запросов на конечную точку HTTP | 
Добавьте маршрут OpenIG в папку routes для обработки HTTP запросов.
10-http2mq.json
{
  "name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}",
  "condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "mq-producer"
            }
          ]
        }
      }
    }
  }
}
Отправьте HTTP запрос в OpenIG и проверьте полученное сообщение в топике DEV.QUEUE.1 IBM MQ:
$ curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' '<http://localhost:8080/http2mq>'
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /http2mq HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 16
> 
* upload completely sent off: 16 out of 16 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Server: Apache-Coyote/1.1
< Content-Length: 0
< Date: Wed, 13 Apr 2022 12:34:03 GMT
< 
* Connection #0 to host localhost left intact
Откройте консоль IBM MQ по адресу  https://localhost:9443/ibmmq/console/. В топике DEV.QUEUE.1 вы увидите полученное сообщение:

Отправка сообщений IBM MQ на конечную точку HTTP
Добавьте IBM MQ cosumer в heap в файл конфигурации OpenIG config.json.
{
  "heap": [
    ...
    {
      "name": "mq-consumer",
      "type": "MQ_IBM",
      "config": {
        "XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)",
        "XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN",
        "XMSC_WMQ_QUEUE_MANAGER":"QM1",
        "XMSC_USERID":"app",
        "XMSC_PASSWORD":"passw0rd",
        "topic.consume": "DEV.QUEUE.2",
        "uri": "/mq2http",
        "method": "POST"
      }
    }
    ...
  ]
}
Добавьте маршрут OpenIG в папку routes для обработки сообщений IBM MQ:
10-mq2http.json
{
  "name": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}",
  "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}",
  "monitor": true,
  "timer": true,
  "handler": {
    "type": "Chain",
    "config": {
      "filters": [],
      "handler": {
        "type": "DispatchHandler",
        "config": {
          "bindings": [
            {
              "handler": "ClientHandler",
              "capture": "all",
              "baseURI": "${system['endpoint.api']}"
            }
          ]
        }
      }
    }
  }
}
Зайдите в консоль IBM MQ и отправьте сообщение в топик DEV.QUEUE.2

В логе сервиса sample-servive вы увидите следующее сообщение:
2022-04-21 08:32:35.007 DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter      : After request [POST /mq2http, headers=[jms_ibm_character_set:"UTF-8", jms_ibm_encoding:"273", jms_ibm_format:"MQSTR", jms_ibm_msgtype:"8", jms_ibm_putappltype:"6", jms_ibm_putdate:"20220421", jms_ibm_puttime:"08323434", jmsxappid:"com.ibm.mq.webconsole", jmsxdeliverycount:"1", jmsxuserid:"unknown", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]
 
          