Приведу пример как сконфигурировать несколько endpoints для подключения к IBM MQ.
Цель:
0. Будем считать, что вы на данный момент уже развернули MQ или пользуетесь чьей-то.
1. Подгружаем зависимости в проект:
2. Создаем конфиг, вводим параметры подключения ваших точек (вы же их создали уже?). Используем массив, поэтому подключений может быть сколь угодно много.
3. Создаем классы для считывания этих пропертей:
4. Создаем слушателя:
5. Конфигурируем! Определяем коннекшионФактори для каждого элемента массива из yml-пропертей. Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты. Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories.
6. Создадим сервисный слой, где допустим будет какая-то логика и после отправка ответа.
7. Добавляем отправку ответа в слушатель:
Вуаля, готово, можно проверять.
Цель:
- читать из нескольких очередей, именованных одинаково, но находящихся на разных хостах/администраторах очередей
- писать ответ в рандомно определенную ноду
0. Будем считать, что вы на данный момент уже развернули MQ или пользуетесь чьей-то.
1. Подгружаем зависимости в проект:
maven
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>2.3.3</version>
</dependency>
gradle
compile group: 'com.ibm.mq', name: 'mq-jms-spring-boot-starter', version: '2.3.3'
2. Создаем конфиг, вводим параметры подключения ваших точек (вы же их создали уже?). Используем массив, поэтому подключений может быть сколь угодно много.
mq:
servers:
- queueManager: QM1
channel: DEV.ADMIN.SVRCONN
connName: ibmmq.ru(1414)
user: admin
password: passw0rd
- queueManager: QM2
channel: DEV.ADMIN.SVRCONN
connName: ibmmq.ru(1415)
user: admin
password: passw0rd
queue1: QUEUE1
queue2: QUEUE2
3. Создаем классы для считывания этих пропертей:
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "mq")
@EqualsAndHashCode(callSuper = false)
@Data
public class MqConfig {
private List<ConnectionConfiguration> servers;
private String queue1;
private String queue2;
}
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = false)
public class ConnectionConfiguration {
String queueManager;
String channel;
String connName;
String user;
String password;
}
4. Создаем слушателя:
import javax.jms.MessageListener;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MqListener implements MessageListener {
@SneakyThrows
@Override
public void onMessage(@Payload javax.jms.Message message) {
log.info("Получено сообщение <" + message + ">");
//TODO: сюда добавим отправку ответа чуть позже
}
5. Конфигурируем! Определяем коннекшионФактори для каждого элемента массива из yml-пропертей. Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты. Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories.
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.*;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.QosSettings;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.jms.support.converter.SimpleMessageConverter;
import javax.jms.*;
import java.util.*;
import static javax.jms.DeliveryMode.NON_PERSISTENT;
import static javax.jms.Session.CLIENT_ACKNOWLEDGE;
@Configuration
@EnableJms
@Slf4j
public class MqConfiguration {
@Autowired
MqConfig mqConfig;
@Autowired
private JmsListenerEndpointRegistry registry;
//Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories
@Bean
public List<JmsListenerContainerFactory> myFactories(
@Qualifier("myConnFactories")
List<CachingConnectionFactory> connectionFactories,
MqListener mqListener) {
List<JmsListenerContainerFactory> factories = new ArrayList<>();
connectionFactories.forEach(connectionFactory -> {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);
QosSettings qosSettings = new QosSettings();
qosSettings.setDeliveryMode(NON_PERSISTENT);
factory.setReplyQosSettings(qosSettings);
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint-"+ UUID.randomUUID());
endpoint.setDestination(mqConfig.getQueue1());
endpoint.setMessageListener(mqListener);
registry.registerListenerContainer(endpoint, factory);
factories.add(factory);
});
return factories;
}
//Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты
@Bean
@Qualifier("myJmsTemplates")
public List<JmsTemplate> jmsTemplates(
@Qualifier("myConnFactories")
List<CachingConnectionFactory> connectionFactories) {
return getJmsTemplates(new ArrayList<ConnectionFactory>(connectionFactories));
}
public List<JmsTemplate> getJmsTemplates(List<ConnectionFactory> connectionFactories) {
List<JmsTemplate> jmsTemplates = new ArrayList<>();
for (ConnectionFactory connectionFactory : connectionFactories) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory);
jmsTemplate.setMessageConverter(new SimpleMessageConverter());
jmsTemplate.setDefaultDestinationName(mqConfig.getQueue2());
jmsTemplate.setDeliveryMode(NON_PERSISTENT);
jmsTemplate.setDeliveryPersistent(false);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplates.add(jmsTemplate);
}
return jmsTemplates;
}
//Определяем коннекшионФактори для каждого элемента массива из yml-пропертей
@Bean
@Qualifier("myConnFactories")
public List<CachingConnectionFactory> connectionFactories() throws JMSException {
List<CachingConnectionFactory> factories = new ArrayList<>();
for (ConnectionConfiguration server : mqConfig.getServers()) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
MQConnectionFactory cf = new MQConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(cf);
cf.setQueueManager(server.getQueueManager());
cf.setChannel(server.getChannel());
cf.setConnectionNameList(server.getConnName());
cf.setStringProperty(WMQConstants.USERID, server.getUser());
cf.setStringProperty(WMQConstants.PASSWORD, server.getPassword());
cf.setStringProperty("XMSC_WMQ_CONNECTION_MODE", "1");
factories.add(cachingConnectionFactory);
}
return factories;
}
}
endpoint.setMessageListener(mqListener);Здесь указываем слушателя (которого создали в п.4), чтобы определить действия при приеме сообщения.
6. Создадим сервисный слой, где допустим будет какая-то логика и после отправка ответа.
import javax.jms.TextMessage;
public interface MqService {
void sendToMq(TextMessage msg);
}
import javax.jms.TextMessage;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MqServiceImpl implements MqService {
@Autowired
private MqConfig mqConfig;
@Autowired
@Qualifier("myJmsTemplates")
List<JmsTemplate> jmsTemplates;
@Override
public void sendToMq(TextMessage msg ) {
//какая-то логика
//рандомным образом определяем в какую ноду/темплейт отправлять сообщение.
int maxIndex = jmsTemplates.size()-1; // Конечное значение диапазона - "до"
int randomNumber = (int) Math.round(Math.random() * maxIndex);
jmsTemplates.get(randomNumber).convertAndSend(mqConfig.getQueue2(), msg);
}
}
7. Добавляем отправку ответа в слушатель:
@Autowired
MqService mqService;
@SneakyThrows
@Override
public void onMessage(@Payload javax.jms.Message message) {
log.info("Получено сообщение <" + message + ">");
mqService.sentToMq((TextMessage) message);
}
Вуаля, готово, можно проверять.
inkelyad
Мне казалось, что в IBM MQ все это должно делается штатным способом через client channel definition table (CCDT). А то что тут описано — это ручное повторение того функционала, что и так в клиентских библиотеках IBM MQ есть.