В статье я опишу способ разработки REST сервиса, позволяющего принимать файлы и сохранять их в систему обмена сообщениями в потоковом режиме без необходимости хранения всего файла на стороне сервиса. Также будет описан обратный сценарий, при котором клиент будет получать в качестве ответа файл, размещенный в системе обмена сообщениями.
Для наглядности я приведу примеры кода разработанного сервиса на JEE7 под сервер приложений IBM WebSphere Liberty Server, а в качестве системы обмена сообщениями будет выступать IBM MQ.
Тем не менее, описанный метод подходит и для других аналогичных платформ, т.е. в качестве системы обмена сообщений может выступать любой поставщик JMS API, а в качестве сервера приложений любой JEE сервер (например, Apache Tomcat).
Постановка задачи
Возникла потребность в реализации решения, которое бы позволяло как получать от клиента файлы большого размера (> 100 Mb) и передавать их в другую территориально удаленную систему, так и в обратную сторону – передавать клиенту в качестве ответа файлы из этой системы. В виду ненадежного сетевого канала между сетью клиента и сетью приложения используется система обмена сообщениями, обеспечивающая гарантированную доставку между ними.
Верхнеуровневое решение включает в себя три компонента:
- REST сервис – задача которого предоставить клиенту возможность передать файл (или запросить).
- MQ – отвечает за передачу сообщений между различными сетями.
- Application – приложение, отвечающее за хранение файлов и выдачу их по запросу.
В этой статье я описываю способ реализации REST сервиса, в задачи которого входит:
- Получение файла от клиента.
- Передача полученного файла в MQ.
- Передача файла из MQ клиенту в качестве ответа.
Метод решения
В виду большого размера передаваемого файла отсутствует возможность размещения его полностью в оперативной памяти, более того, со стороны MQ также накладывается ограничение – максимальный размер одного сообщения в MQ не может превышать 100 Mb. Таким образом мое решение будет основываться на следующих принципах:
- Получение файла и сохранение его в MQ очереди должно выполняться в потоковом режиме, без помещения в память полностью файла.
- В очереди MQ файл будет размещаться в виде набора небольших сообщений.
Графически размещение файла на стороне клиента, REST сервиса и MQ показано ниже:
На стороне клиента файл полностью размещается на файловой системе, в REST-сервисе в оперативной памяти хранится лишь порция файла, а на стороне MQ – каждая порция файла размещается в виде отдельного сообщения.
Разработка REST сервиса
Для наглядности предлагаемого метода решения будет разработан демонстрационный REST сервис, содержащий два метода:
- upload – получает от клиента файл и записывает его в MQ очередь, в качестве ответа возвращает идентификатор группы сообщений (в base64 формате).
- download – получает от клиента идентификатор группы сообщений (в base64 формате) и возвращает файл, хранящийся в MQ очереди.
Метод получения файла от клиента (upload)
В задачу метода входит получение потока входящего файла и последующая запись его в MQ очередь.
Получение потока входящего файла
Для получения входящего файла от клиента, метод ожидает в качестве входящего параметра объект с интерфейсом com.ibm.websphere.jaxrs20.multipart.IMultipartBody, который предоставляет возможность получить ссылку на поток входящего файла
@PUT
@Path("upload")
public Response upload(IMultipartBody body) {
...
IAttachment attachment = body.getAttachment("file");
InputStream inputStream = attachment.getDataHandler().getInputStream();
...
}
Данный интерфейс (IMultipartBody) находится в JAR-архиве com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, входит в поставку к IBM Liberty Server и размещается в папке: <WLP_INSTALLATION_PATH>/dev/api/ibm.
Примечание:
- WLP_INSTALLATION_PATH — путь к директории WebSphere Liberty Profile.
- Ожидается, что клиент будет передавать файл в параметре с именем «file».
- Если используется другой сервер приложений, то можно воспользоваться альтернативной библиотекой от Apache CXF.
Потоковое сохранение файла в MQ
Метод получает на вход поток входящего файла, название MQ очереди, куда следует записать файл, и идентификатор группы сообщений, который будут использоваться для связывания сообщений. Идентификатор группы генерируется на стороне сервиса, например, утилитой org.apache.commons.lang3.RandomStringUtils:
String groupId = RandomStringUtils.randomAscii(24);
Алгоритм сохранения входящего файла в MQ состоит из следующих этапов:
- Инициализация объектов подключения к MQ.
- Цикличное считывание порции входящего файла пока файл не будет полностью считан:
- Порция данных файла записывается в виде отдельного сообщения в MQ.
- Каждое сообщение файла имеет свой порядковый номер (свойство «JMSXGroupSeq»).
- Все сообщения файла имеет одинаковое значение группы (свойство «JMSXGroupID»).
- Последнее сообщение имеет признак, означающий, что это сообщение является завершающим (свойство «JMS_IBM_Last_Msg_In_Group»).
- Константа SEGMENT_SIZE содержит размер порции. Например, 1Mb.
public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException {
try (
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(queueName));
) {
byte[] buffer = new byte[SEGMENT_SIZE];
BytesMessage message = null;
for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) {
readBytesSize = inputStream.read(buffer);
if (message != null) {
if (readBytesSize < 1) {
message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
} producer.send(message);
}
if (readBytesSize > 0) {
message = session.createBytesMessage();
message.setStringProperty("JMSXGroupID", groupId);
message.setIntProperty("JMSXGroupSeq", sequenceNumber);
if (readBytesSize == SEGMENT_SIZE) {
message.writeBytes(buffer);
} else {
message.writeBytes(Arrays.copyOf(buffer, readBytesSize));
}
}
}
}
}
Метод отправки файла клиенту (download)
Метод получает идентификатор группы сообщений в формате base64, по которому считывает сообщения из MQ очереди и отправляет в качестве ответа в потоковом режиме.
Получение идентификатора группы сообщений
В качестве входящего параметра метод получает идентификатор группы сообщений.
@PUT
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
...
}
Потоковая передача ответа клиенту
Для передачи клиенту файла, хранящемуся в виде набора отдельных сообщений в MQ, в потоковом режиме следует создать класс с интерфейсом javax.ws.rs.core.StreamingOutput:
public class MQStreamingOutput implements StreamingOutput {
private String groupId;
private String queueName;
public MQStreamingOutput(String groupId, String queueName) {
super();
this.groupId = groupId;
this.queueName = queueName;
}
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException {
try {
MQWorker().read(outputStream, queueName, groupId);
} catch(NamingException | JMSException e) {
e.printStackTrace();
new IOException(e);
} finally {
outputStream.flush();
outputStream.close();
}
}
}
В классе реализуем метод write, который получает на вход ссылку на исходящий поток, в который будут записываться сообщения из MQ. Я добавил в класс еще название очереди и идентификатор группы, сообщения которой будут считываться.
Объект этого класса будет передан в качестве параметра для создания ответа клиенту:
@GET
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
ResponseBuilder responseBuilder = null;
try {
MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME);
responseBuilder = Response.ok(streamingOutput);
} catch(Exception e) {
e.printStackTrace();
responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage());
}
return responseBuilder.build();
}
Потоковое считывание файла из MQ
Алгоритм считывания сообщений из MQ в исходящий поток состоит из следующих этапов:
- Инициализация объектов подключения к MQ.
- Цикличное считывание сообщений из MQ пока не будет считано сообщение с признаком завершающего в группе (свойство «JMS_IBM_Last_Msg_In_Group»):
- Перед каждым считыванием сообщения из очереди устанавливается фильтр (messageSelector), в котором задается идентификатор группы сообщений и порядковый номер сообщения в группе.
- Содержимое считанного сообщения записывается в исходящий поток.
public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException {
try(
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
) {
connection.start();
Queue queue = session.createQueue(queueName);
int sequenceNumber = 1;
for(boolean isMessageExist = true; isMessageExist == true; ) {
String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++;
try(
MessageConsumer consumer = session.createConsumer(queue, messageSelector);
) {
BytesMessage message = (BytesMessage) consumer.receiveNoWait();
if (message == null) {
isMessageExist = false;
} else {
byte[] buffer = new byte[(int) message.getBodyLength()];
message.readBytes(buffer);
outputStream.write(buffer);
if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) {
isMessageExist = false;
}
}
}
}
}
}
Вызов REST сервиса
Для проверки работы сервиса я воспользуюсь инструментом curl.
Отправка файла
curl -X PUT -F file=@<путь_к_файлу> http://localhost:9080/Demo/rest/service/upload
В ответ будет получена base64 строка, содержащая идентификатор группы сообщений, которую мы укажем в следующем методе для получения файла.
Получение файла
curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64_строка_идентификатор_группы_сообщений> -o <путь_к_файлу_куда_запишется_ответ>
Заключение
В статье был рассмотрен подход к разработке REST сервиса, позволяющему в потоковом режиме как получать и сохранять большие данные в очередь системы обмена сообщениями, так и считывать их из очереди для возвращения в виде ответа. Такой способ позволяет сократить использование ресурсов, и тем самым увеличить пропускную способность решения.
Дополнительные материалы
Подробнее об интерфейсе IMultipartBody, используемый для получения входящего потока файла — ссылка.
Альтернативная библиотека для получения файлов в потоковом режиме в REST сервисах – Apache CXF.
Интерфейс StreamingOutput для потокового возвращения REST ответа клиенту — ссылка.
Комментарии (10)
panchmp
30.09.2018 22:56какая цель данной статьи?
данный говнокод не пройдет маломальски вменяемое код ревьюNickmetal Автор
01.10.2018 21:34Приходилось встречаться с решениями, в которых отсутствует именно потоковая передача файла, т.е. приложение полностью сохраняет файл в оперативную память прежде чем записать его в конечную систему (например, в MQ очередь). В результате, приложение накладывает ограничение на размер передаваемого файла, а загрузка ресурсов не только снижает пропускную способность, но и может привести к падению системы из-за нехватки памяти.
В частности, такое решение, как IBM Integration Bus до сих пор не позволяет в потоковом режиме сохранить получаемый файл по REST протоколу и записать его в MQ очередь.
В статье хотелось продемонстрировать способ, которым я воспользовался для решения такой задачи. Возможно, кому-то будет полезно.
Throwable
30.09.2018 23:16Ой как все фиговенько!
Во-первых, как-то вы оптимистично обрабатываете нештатные ситуации. Если у вас оборвался InputStream от клиента, вы все-равно должны отослать в очередь пакет JMS_IBM_Last_Msg_In_Group и как-то пометить его как "Exception". В свою очередь reader, прочитав такой пакет, должен будет вместо нормального завершения сгенерить IOException и дать понять консьюмеру на другой стороне, что стрим оборвался.
Во-вторых, ваш reader будет работать только, если весь файл полностью уже лежит в очереди, потому как вы делаете receiveNoWait(). В качестве бонуса такая логика на ура будет выдавать недочитанные файлы, если writer не успел записать блок. Здесь нужно поставить обычный блокирующий read() с каким-нибудь таймаутом, чтобы исключить бесконечную блокировку, если нужный пакет так и не пришел.
В-третьих, приложение-консьюмер каким-то мистическим образом узнает groupId, который ему надо читать. В реале вам нужно сделать бесконечный reader, который постоянно слушает очередь, и сначала запрашивает первый пакет из любой цепочки, узнает из него groupId и уже потом дочитывает остальное из данной цепочки, а затем возвращается на начало. И чтобы расширить пропускную способность, надо бы сделать целый пул из таких reader-ов.
В-четвертых, очередь будет потихоньку забиваться пакетами из оборванных цепочек. Необходима периодическая очистка от старых пакетов.
Ну и наконец, если уж вы используете IBM MQ, то вроде там как бы уже реализован сервис файлтрансфера. А вы тут предлагаете решения для бедных...
Nickmetal Автор
01.10.2018 21:28Спасибо за развернутые замечания и предлагаемые решения!
Согласен с Вами, что приведенный код не отвечает требованиям по надежности в случае нештатных ситуаций. В статье я бы не хотел усложнять код обработкой такого рода ошибок, а как можно проще продемонстрировать библиотеки и классы, используемые для решения такой задачи.
time2rfc
01.10.2018 00:13Очень похоже на контрольную или практическую работу в ВУЗе по слогу и оформлению.
Не понял отчего это REST.
sshikov
По-моему ваш код не скомпилируется.
И еще, что интереснее: вы предполагаете, что сообщения (то есть файлы) будут постоянно храниться в очереди MQ? Если нет, то как они туда попадают, когда клиент запрашивает download?
Мне кажется, что вы обещали какое-то приложение, которое будет хранить файлы, и помещать их в очередь по запросу, но не показали его. А в возможности его реализации есть некоторые сомнения.
Mishiko
Не вникал в детали, но думаю, что у автора все получится (сам делал что то подобное дважды, лет 10 назад — одно решение было очень похоже: большой дамп разбивался на мелкие SOAP-запросы для преодоления файрволов и сети плохого качества, куски дампа пападали в JMS-очередь, откуда перемещались во временную папку, при наличии всех кусков они собирались в файл и раздампливались).
sshikov
>Не вникал в детали
А вы вникните. Еще раз повторю — автор при выполнении download берет файл из очереди MQ. На первый взгляд, есть два варианта:
Оба способа имеют очевидные недостатки.
P.S. Судя по вашему описанию, у вас не было REST клиента, который выгружал бы файлы из системы. А это две большие разницы.
Nickmetal Автор
Прошу прощения если ввел в заблуждение.
Задача статьи состоит в демонстрации способа потоковой передачи файла между REST сервисом и системой обмена сообщениями (MQ). Описанный вначале статьи сценарий служит для иллюстрации ситуации, в которой может возникнуть такая потребность.
Чтобы не усложнять статью и не отвлекаться от её основной задачи был приведен код сервиса, выполняющего минимальный набор операций, позволяющих продемонстрировать каким образом осуществляется потоковое сохранение файла из REST запроса в очередь MQ, а также как можно вернуть в REST-ответ файл, хранящийся в MQ очереди.
В рамках демонстрационного сервиса, метод upload получает файл от клиента и сохраняет его в MQ очередь, в которой эти сообщения дальше никуда не отправляются. Метод download считывает эти сообщения и возвращает в виде ответа клиенту.
В промышленном сценарии сообщения отправляются в целевую сеть, где считываются приложением и записываются в хранилище, например, в систему электронного документооборота, после чего возвращают клиенту, например, идентификатор файла. При запросе файла, клиент передает этот идентификатор REST сервису, который отправляет его через систему обмена сообщениями в приложение, которое считывает из хранилища этот файл и помещает его в MQ, после чего REST-сервис отправляет полученный набор сообщений из MQ в REST ответа в виде файла.
Если более детально рассматривать промышленный сценарий, то это могло бы выглядеть так:
Сценарий отправки файла от клиента:
Сценарий запроса файла:
sshikov
Да, так уже понятно, что именно задумано. Только вот есть подозрения, что так работать будет не очень хорошо. Но это уже сплошная интуиция, тут я не настаиваю.
Дело в неизвестных задержках между выполнением запроса 1 (сценарий запроса файла) и получением ответа в п. 7. Между ними два обмена по MQ, причем второй — это передача самого файла (очевидно, фрагментами, примерно аналогично 1 сценарию).
Все это время ваш REST клиент очевидно ждет. И сервис тоже. Если учесть, что файлы могут быть разного размера, и нагрузка на сеть варьируется, настроить фиксированный таймаут, который бы хорошо работал во всех случаях, будет непросто.
Да, это не значит, что у меня есть готовое решение. Я бы наверное нотификацию клиенту сдалал бы асинхронно, на основе вебсокетов, например. И при ее получении уже загрузил бы сам файл.