Привет, Хабр!

В статье я опишу способ разработки REST сервиса, позволяющего принимать файлы и сохранять их в систему обмена сообщениями в потоковом режиме без необходимости хранения всего файла на стороне сервиса. Также будет описан обратный сценарий, при котором клиент будет получать в качестве ответа файл, размещенный в системе обмена сообщениями.

Для наглядности я приведу примеры кода разработанного сервиса на JEE7 под сервер приложений IBM WebSphere Liberty Server, а в качестве системы обмена сообщениями будет выступать IBM MQ.
Тем не менее, описанный метод подходит и для других аналогичных платформ, т.е. в качестве системы обмена сообщений может выступать любой поставщик JMS API, а в качестве сервера приложений любой JEE сервер (например, Apache Tomcat).

Постановка задачи


Возникла потребность в реализации решения, которое бы позволяло как получать от клиента файлы большого размера (> 100 Mb) и передавать их в другую территориально удаленную систему, так и в обратную сторону – передавать клиенту в качестве ответа файлы из этой системы. В виду ненадежного сетевого канала между сетью клиента и сетью приложения используется система обмена сообщениями, обеспечивающая гарантированную доставку между ними.

Верхнеуровневое решение включает в себя три компонента:

  1. REST сервис – задача которого предоставить клиенту возможность передать файл (или запросить).
  2. MQ – отвечает за передачу сообщений между различными сетями.
  3. Application – приложение, отвечающее за хранение файлов и выдачу их по запросу.

image

В этой статье я описываю способ реализации REST сервиса, в задачи которого входит:

  • Получение файла от клиента.
  • Передача полученного файла в MQ.
  • Передача файла из MQ клиенту в качестве ответа.

Метод решения


В виду большого размера передаваемого файла отсутствует возможность размещения его полностью в оперативной памяти, более того, со стороны MQ также накладывается ограничение – максимальный размер одного сообщения в MQ не может превышать 100 Mb. Таким образом мое решение будет основываться на следующих принципах:

  • Получение файла и сохранение его в MQ очереди должно выполняться в потоковом режиме, без помещения в память полностью файла.
  • В очереди MQ файл будет размещаться в виде набора небольших сообщений.

Графически размещение файла на стороне клиента, REST сервиса и MQ показано ниже:

image

На стороне клиента файл полностью размещается на файловой системе, в REST-сервисе в оперативной памяти хранится лишь порция файла, а на стороне MQ – каждая порция файла размещается в виде отдельного сообщения.

Разработка REST сервиса


Для наглядности предлагаемого метода решения будет разработан демонстрационный REST сервис, содержащий два метода:

  • upload – получает от клиента файл и записывает его в MQ очередь, в качестве ответа возвращает идентификатор группы сообщений (в base64 формате).
  • download – получает от клиента идентификатор группы сообщений (в base64 формате) и возвращает файл, хранящийся в MQ очереди.

Метод получения файла от клиента (upload)


В задачу метода входит получение потока входящего файла и последующая запись его в MQ очередь.

Получение потока входящего файла


Для получения входящего файла от клиента, метод ожидает в качестве входящего параметра объект с интерфейсом com.ibm.websphere.jaxrs20.multipart.IMultipartBody, который предоставляет возможность получить ссылку на поток входящего файла

@PUT
@Path("upload")
public Response upload(IMultipartBody body) {
	...
	IAttachment attachment = body.getAttachment("file");
	InputStream inputStream = attachment.getDataHandler().getInputStream();
	...
}

Данный интерфейс (IMultipartBody) находится в JAR-архиве com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, входит в поставку к IBM Liberty Server и размещается в папке: <WLP_INSTALLATION_PATH>/dev/api/ibm.

Примечание:

  • WLP_INSTALLATION_PATH — путь к директории WebSphere Liberty Profile.
  • Ожидается, что клиент будет передавать файл в параметре с именем «file».
  • Если используется другой сервер приложений, то можно воспользоваться альтернативной библиотекой от Apache CXF.

Потоковое сохранение файла в MQ


Метод получает на вход поток входящего файла, название MQ очереди, куда следует записать файл, и идентификатор группы сообщений, который будут использоваться для связывания сообщений. Идентификатор группы генерируется на стороне сервиса, например, утилитой org.apache.commons.lang3.RandomStringUtils:

String groupId = RandomStringUtils.randomAscii(24);

Алгоритм сохранения входящего файла в MQ состоит из следующих этапов:

  1. Инициализация объектов подключения к MQ.
  2. Цикличное считывание порции входящего файла пока файл не будет полностью считан:
    1. Порция данных файла записывается в виде отдельного сообщения в MQ.
    2. Каждое сообщение файла имеет свой порядковый номер (свойство «JMSXGroupSeq»).
    3. Все сообщения файла имеет одинаковое значение группы (свойство «JMSXGroupID»).
    4. Последнее сообщение имеет признак, означающий, что это сообщение является завершающим (свойство «JMS_IBM_Last_Msg_In_Group»).
    5. Константа SEGMENT_SIZE содержит размер порции. Например, 1Mb.

public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException {
	try (
		Connection connection = connectionFactory.createConnection();
		Session session = connection.createSession();
		MessageProducer producer = session.createProducer(session.createQueue(queueName));
	) {
		byte[] buffer = new byte[SEGMENT_SIZE];
		BytesMessage message = null;
		for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) {
			readBytesSize = inputStream.read(buffer);
			if (message != null) {
				if (readBytesSize < 1) {
					message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
				}	producer.send(message);
			}
			if (readBytesSize > 0) {
				message = session.createBytesMessage();
				message.setStringProperty("JMSXGroupID", groupId);
				message.setIntProperty("JMSXGroupSeq", sequenceNumber);
				if (readBytesSize == SEGMENT_SIZE) {
					message.writeBytes(buffer);
				} else {
					message.writeBytes(Arrays.copyOf(buffer, readBytesSize));
				}
			}
		}
	}
}

Метод отправки файла клиенту (download)


Метод получает идентификатор группы сообщений в формате base64, по которому считывает сообщения из MQ очереди и отправляет в качестве ответа в потоковом режиме.

Получение идентификатора группы сообщений


В качестве входящего параметра метод получает идентификатор группы сообщений.

@PUT
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
	...
}

Потоковая передача ответа клиенту


Для передачи клиенту файла, хранящемуся в виде набора отдельных сообщений в MQ, в потоковом режиме следует создать класс с интерфейсом javax.ws.rs.core.StreamingOutput:

public class MQStreamingOutput implements StreamingOutput {

	private String groupId;
	private String queueName;
	
	public MQStreamingOutput(String groupId, String queueName) {
		super();
		this.groupId = groupId;
		this.queueName = queueName;
	}

	@Override
	public void write(OutputStream outputStream) throws IOException, WebApplicationException {
		try {
			MQWorker().read(outputStream, queueName, groupId);
		} catch(NamingException | JMSException e) {
			e.printStackTrace();
			new IOException(e);
		} finally {
			outputStream.flush();
			outputStream.close();
		}

	}
}

В классе реализуем метод write, который получает на вход ссылку на исходящий поток, в который будут записываться сообщения из MQ. Я добавил в класс еще название очереди и идентификатор группы, сообщения которой будут считываться.

Объект этого класса будет передан в качестве параметра для создания ответа клиенту:

@GET
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
	ResponseBuilder responseBuilder = null;
	try {
		MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME);
		responseBuilder = Response.ok(streamingOutput);	
	} catch(Exception e) {
		e.printStackTrace();
	responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage());
	}
	return responseBuilder.build();
}

Потоковое считывание файла из MQ


Алгоритм считывания сообщений из MQ в исходящий поток состоит из следующих этапов:

  1. Инициализация объектов подключения к MQ.
  2. Цикличное считывание сообщений из MQ пока не будет считано сообщение с признаком завершающего в группе (свойство «JMS_IBM_Last_Msg_In_Group»):
    1. Перед каждым считыванием сообщения из очереди устанавливается фильтр (messageSelector), в котором задается идентификатор группы сообщений и порядковый номер сообщения в группе.
    2. Содержимое считанного сообщения записывается в исходящий поток.


public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException {
	try(
		Connection connection = connectionFactory.createConnection();
		Session session = connection.createSession();
	) {
		connection.start();
		Queue queue = session.createQueue(queueName);
		int sequenceNumber = 1;
		for(boolean isMessageExist = true; isMessageExist == true; ) {
			String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++;
			try(
				MessageConsumer consumer = session.createConsumer(queue, messageSelector);
					) {
				BytesMessage message = (BytesMessage) consumer.receiveNoWait();
				if (message == null) {
					isMessageExist = false;
				} else {
					byte[] buffer = new byte[(int) message.getBodyLength()];
					message.readBytes(buffer);
					outputStream.write(buffer);
					if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) {
						isMessageExist = false;
					}
				}
			}
		}
	}
}

Вызов REST сервиса


Для проверки работы сервиса я воспользуюсь инструментом curl.

Отправка файла


curl -X PUT -F file=@<путь_к_файлу> http://localhost:9080/Demo/rest/service/upload

В ответ будет получена base64 строка, содержащая идентификатор группы сообщений, которую мы укажем в следующем методе для получения файла.

Получение файла


curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64_строка_идентификатор_группы_сообщений> -o <путь_к_файлу_куда_запишется_ответ>

Заключение


В статье был рассмотрен подход к разработке REST сервиса, позволяющему в потоковом режиме как получать и сохранять большие данные в очередь системы обмена сообщениями, так и считывать их из очереди для возвращения в виде ответа. Такой способ позволяет сократить использование ресурсов, и тем самым увеличить пропускную способность решения.

Дополнительные материалы


Подробнее об интерфейсе IMultipartBody, используемый для получения входящего потока файла — ссылка.

Альтернативная библиотека для получения файлов в потоковом режиме в REST сервисах – Apache CXF.

Интерфейс StreamingOutput для потокового возвращения REST ответа клиенту — ссылка.

Комментарии (10)


  1. sshikov
    30.09.2018 22:48

    По-моему ваш код не скомпилируется.

    И еще, что интереснее: вы предполагаете, что сообщения (то есть файлы) будут постоянно храниться в очереди MQ? Если нет, то как они туда попадают, когда клиент запрашивает download?

    Мне кажется, что вы обещали какое-то приложение, которое будет хранить файлы, и помещать их в очередь по запросу, но не показали его. А в возможности его реализации есть некоторые сомнения.


    1. Mishiko
      01.10.2018 11:11

      Не вникал в детали, но думаю, что у автора все получится (сам делал что то подобное дважды, лет 10 назад — одно решение было очень похоже: большой дамп разбивался на мелкие SOAP-запросы для преодоления файрволов и сети плохого качества, куски дампа пападали в JMS-очередь, откуда перемещались во временную папку, при наличии всех кусков они собирались в файл и раздампливались).


      1. sshikov
        01.10.2018 18:59

        >Не вникал в детали
        А вы вникните. Еще раз повторю — автор при выполнении download берет файл из очереди MQ. На первый взгляд, есть два варианта:

        • он там хранится
        • он туда попадает после того, как REST сервер пошлет запрос приложению, и оно файл в очередь загрузит.

        Оба способа имеют очевидные недостатки.

        P.S. Судя по вашему описанию, у вас не было REST клиента, который выгружал бы файлы из системы. А это две большие разницы.


    1. Nickmetal Автор
      01.10.2018 21:27

      Прошу прощения если ввел в заблуждение.

      Задача статьи состоит в демонстрации способа потоковой передачи файла между REST сервисом и системой обмена сообщениями (MQ). Описанный вначале статьи сценарий служит для иллюстрации ситуации, в которой может возникнуть такая потребность.

      Чтобы не усложнять статью и не отвлекаться от её основной задачи был приведен код сервиса, выполняющего минимальный набор операций, позволяющих продемонстрировать каким образом осуществляется потоковое сохранение файла из REST запроса в очередь MQ, а также как можно вернуть в REST-ответ файл, хранящийся в MQ очереди.

      В рамках демонстрационного сервиса, метод upload получает файл от клиента и сохраняет его в MQ очередь, в которой эти сообщения дальше никуда не отправляются. Метод download считывает эти сообщения и возвращает в виде ответа клиенту.

      В промышленном сценарии сообщения отправляются в целевую сеть, где считываются приложением и записываются в хранилище, например, в систему электронного документооборота, после чего возвращают клиенту, например, идентификатор файла. При запросе файла, клиент передает этот идентификатор REST сервису, который отправляет его через систему обмена сообщениями в приложение, которое считывает из хранилища этот файл и помещает его в MQ, после чего REST-сервис отправляет полученный набор сообщений из MQ в REST ответа в виде файла.

      Если более детально рассматривать промышленный сценарий, то это могло бы выглядеть так:
      Сценарий отправки файла от клиента:

      1. Клиент отправляет файл в REST сервис.
      2. REST сервис в потоковом режиме сохраняет файл в очередь MQ «CONTENT.TO.STORE» в виде набора сообщений.
      3. REST сервис формирует сообщение, указывая мета информацию о файле (например, наименование, размер, идентификатор группы сообщений), и отправляет в очередь MQ «REQUEST.TO.STORE».
      4. MQ передает эти сообщения в целевые очереди, находящуюся в территориально удаленной сети.
      5. На той стороне приложение Application прослушивает очередь «REQUEST.TO.STORE» (например, через Message-Driven Bean), получая мета информацию о пересланном файле.
      6. Приложение по полученному идентификатору группы сообщений считывает из очереди «CONTENT.TO.STORE» содержимое файла и сохраняет его в своем хранилище.
      7. Приложение формирует сообщение с идентификатором файла, которое было присвоено в этом хранилище, и отправляет его в очередь MQ «RESPONSE.TO.STORE».
      8. MQ передает это сообщение обратно в сеть, где размещен REST сервис, который ожидает ответного сообщения из очереди «RESPONSE.TO.STORE».
      9. REST сервис, получив ответ – возвращает клиенту идентификатор файла.


      Сценарий запроса файла:
      1. Клиент отправляет в REST сервис идентификатор файла.
      2. REST сервис формирует сообщение, указывая полученный идентификатор файла и отправляет в очередь MQ «REQUEST.TO.RETRIEVE».
      3. MQ передает сообщение в сеть, где размещается приложение Application.
      4. Приложение Application, получив сообщение из очереди «REQUEST.TO.RETRIEVE», по полученному идентификатору считывает файл из хранилища и записывает в очередь MQ «CONTENT.TO.RETRIEVE».
      5. Приложение Application формирует сообщение с метаинформацией о файле (имя, размер, идентификатор группы сообщений) и отправляет в очередь «RESPONSE.TO.RETRIEVE».
      6. MQ передает эти сообщения обратно в сеть, где размещен REST сервис, ожидающий ответного сообщения из очереди «RESPONSE.TO.RETRIEVE».
      7. REST сервис считывает по идентификатору группы сообщений содержимое файла из очереди «CONTENT.TO.RETRIEVE» и отправляет в виде ответа клиенту.


      1. sshikov
        01.10.2018 22:35

        Да, так уже понятно, что именно задумано. Только вот есть подозрения, что так работать будет не очень хорошо. Но это уже сплошная интуиция, тут я не настаиваю.

        Дело в неизвестных задержках между выполнением запроса 1 (сценарий запроса файла) и получением ответа в п. 7. Между ними два обмена по MQ, причем второй — это передача самого файла (очевидно, фрагментами, примерно аналогично 1 сценарию).

        Все это время ваш REST клиент очевидно ждет. И сервис тоже. Если учесть, что файлы могут быть разного размера, и нагрузка на сеть варьируется, настроить фиксированный таймаут, который бы хорошо работал во всех случаях, будет непросто.

        Да, это не значит, что у меня есть готовое решение. Я бы наверное нотификацию клиенту сдалал бы асинхронно, на основе вебсокетов, например. И при ее получении уже загрузил бы сам файл.


  1. panchmp
    30.09.2018 22:56

    какая цель данной статьи?
    данный говнокод не пройдет маломальски вменяемое код ревью


    1. Nickmetal Автор
      01.10.2018 21:34

      Приходилось встречаться с решениями, в которых отсутствует именно потоковая передача файла, т.е. приложение полностью сохраняет файл в оперативную память прежде чем записать его в конечную систему (например, в MQ очередь). В результате, приложение накладывает ограничение на размер передаваемого файла, а загрузка ресурсов не только снижает пропускную способность, но и может привести к падению системы из-за нехватки памяти.

      В частности, такое решение, как IBM Integration Bus до сих пор не позволяет в потоковом режиме сохранить получаемый файл по REST протоколу и записать его в MQ очередь.

      В статье хотелось продемонстрировать способ, которым я воспользовался для решения такой задачи. Возможно, кому-то будет полезно.


  1. Throwable
    30.09.2018 23:16

    Ой как все фиговенько!


    Во-первых, как-то вы оптимистично обрабатываете нештатные ситуации. Если у вас оборвался InputStream от клиента, вы все-равно должны отослать в очередь пакет JMS_IBM_Last_Msg_In_Group и как-то пометить его как "Exception". В свою очередь reader, прочитав такой пакет, должен будет вместо нормального завершения сгенерить IOException и дать понять консьюмеру на другой стороне, что стрим оборвался.


    Во-вторых, ваш reader будет работать только, если весь файл полностью уже лежит в очереди, потому как вы делаете receiveNoWait(). В качестве бонуса такая логика на ура будет выдавать недочитанные файлы, если writer не успел записать блок. Здесь нужно поставить обычный блокирующий read() с каким-нибудь таймаутом, чтобы исключить бесконечную блокировку, если нужный пакет так и не пришел.


    В-третьих, приложение-консьюмер каким-то мистическим образом узнает groupId, который ему надо читать. В реале вам нужно сделать бесконечный reader, который постоянно слушает очередь, и сначала запрашивает первый пакет из любой цепочки, узнает из него groupId и уже потом дочитывает остальное из данной цепочки, а затем возвращается на начало. И чтобы расширить пропускную способность, надо бы сделать целый пул из таких reader-ов.


    В-четвертых, очередь будет потихоньку забиваться пакетами из оборванных цепочек. Необходима периодическая очистка от старых пакетов.


    Ну и наконец, если уж вы используете IBM MQ, то вроде там как бы уже реализован сервис файлтрансфера. А вы тут предлагаете решения для бедных...


    1. Nickmetal Автор
      01.10.2018 21:28

      Спасибо за развернутые замечания и предлагаемые решения!

      Согласен с Вами, что приведенный код не отвечает требованиям по надежности в случае нештатных ситуаций. В статье я бы не хотел усложнять код обработкой такого рода ошибок, а как можно проще продемонстрировать библиотеки и классы, используемые для решения такой задачи.


  1. time2rfc
    01.10.2018 00:13

    Очень похоже на контрольную или практическую работу в ВУЗе по слогу и оформлению.
    Не понял отчего это REST.