image


Это третья статья об использовании Scala в тестировании. Сегодня будут рассмотрены примеры использования Apache Camel для создания тестовых заглушек, а также компонентов информационной системы.


Часто возникает необходимость эмулировать работу какой-либо части системы для интеграционного тестирования, сделать заглушку или написать простой компонент интеграции. Это может быть веб-сервис, возвращающий нужные ответы, тест, наполняющий базу данных, приложение, которое считывает сообщение из очереди и возвращает результат обработки, генератор файлов и другие компоненты.


Для разовой проверки интеграции мы бы использовали простое Java или Scala приложение, сценарий Apache JMeter или SoapUI. Но нам нужна система, которая постоянно работает, отвечает на запросы и не требует действий со стороны тестировщика — запустил и забыл. Для решения такой задачи мы можем создать приложение, основанное на фреймворке Apache Сamel.
Рассмотрим 5 примеров:


  1. Чтение файлов в одной кодировке, запись в другой;
  2. Запрос к веб сервису по расписанию и сохранение сообщения в хранилище данных;
  3. Реализация веб-сервиса, который возвращает сообщение в зависимости от параметра GET запроса;
  4. Чтение сообщения из очереди и отправка сообщения в БД;
  5. Пример маршрутизации по содержимому файла.

Кратко опишем инструменты, которые используются для решения задачи. Apache camel (http://camel.apache.org/) — Java фреймворк, предназначенный для реализации обмена сообщениями между отдельными приложениями, подсистемами информационной системы. Реализует подход к разработке связующего программного обеспечения Enterprise Integration Patterns (EIP). Позволяет работать с файлами, БД, менеджерами очередей, веб-сервисами, и другими компонентами — их более 240 видов на странице проекта component. В приложениие Camel описываются так называемые endpoints — конечные точки, и правила преобразования и маршрутизации сообщений между ними.


Компонент Camel реализует конечную точку. Это либо производитель сообщения (Producer), либо потребитель (Consumer). Некоторые компоненты могут реализовывать оба вида точек, к примеру, из файла можно получить сообщение и записать. Некоторые компоненты реализуют только производителя сообщения, например таймер, или потребителя, например вывод в лог.


В ходе работы приложения происходит манипулирование телом сообщения и его заголовками. Порядок работы с Camel следующий:


  1. Описываем источник сообщения (файл, очередь, БД, сервис, таймер и т.п.);
  2. Описываем правила преобразования данных и форматов;
  3. Описываем получателя (получателей) сообщения(файл, очередь, БД, сервис, вывод в консоль и т.п.) и логику маршрутизации;
  4. Запускаем приложение, которое слушает источник, и при появлении сообщения преобразует его и маршрутизирует до получателей.

Для описания правил маршрутизации и преобразования сообщений используются различные языки languages. Для себя мы выбрали Scala DSL scala-dsl-eip, потому что этот язык хорошо подходит для простого и быстрого создания компонентного программного обеспечения. Для Scala используем систему сборки проекта SBT.


Существует отличный пример с чтением сообщения из файла и отправкой его http post запросом. Он немного устаревший, но может быть полезен.


» http://www.lightbend.com/activator/template/camel-http
» https://github.com/hilton/activator-camel-http#master


Подготовительные работы
Создадим проект в idea на основе SBT. Пример создания проекта можно подсмотреть — Реализация мониторинга и интеграционного тестирования информационной системы с использованием Scalatest. Часть1
В файле build.sbt пропишем настройки


name := "camel-scaladsl"
version := "1.0"
scalaVersion := "2.11.8"
val camelVersion = "2.17.1"

libraryDependencies ++= Seq(
  // Компоненты для Camel
  "org.apache.camel" % "camel-core" % camelVersion,
  "org.apache.camel" % "camel-scala" % camelVersion,
  // Для каждого компонента Camel своя зависимость
  "org.apache.camel" % "camel-quartz" % camelVersion,
  "org.apache.camel" % "camel-spring-redis" % camelVersion,
  "org.apache.camel" % "camel-http" % camelVersion,
  "org.apache.camel" % "camel-jetty" % camelVersion,
  "org.apache.camel" % "camel-jms" % camelVersion,
  "org.apache.camel" % "camel-jdbc" % camelVersion,
  // Компоненты для логгирования
  "ch.qos.logback" % "logback-classic" % "1.1.2",
  "org.slf4j" % "slf4j-api" % "1.7.7",
  // Компонент для работы xml в скала
  "org.scala-lang.modules" % "scala-xml_2.11" % "1.0.5",
  // Драйвер БД H2
  "com.h2database" % "h2" % "1.4.192",
  "org.apache.commons" % "commons-dbcp2" % "2.1.1",
  // Драйвер для брокера activemq
  "org.apache.activemq" % "activemq-client" % "5.13.3"
)

Добавим файл src/main/resources файл logback.xml, в котором настроен уровень логгирования и формат сообщения.


<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    <root level="INFO">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

Иначе по умолчанию будет уровень DEBUG — выводится слишком много информации.


Пример 1
Чтение файлов в одной кодировке, запись в другой. Это простое приложение, которое использует компонент http://camel.apache.org/file2.html из пакета camel-core. Оно состоит из объекта, запускающего приложение FromFileToFileApp и класса FromFileToFileRoute, в котором описаны маршруты. Класс с маршрутами можно вынести в отдельный файл.


Содержимое файла src/main/scala/FromFileToFileApp.scala


import org.apache.camel.CamelContext
import org.apache.camel.main.Main
import org.apache.camel.scala.dsl.builder.{ScalaRouteBuilder, RouteBuilderSupport}

object FromFileToFileApp extends App with RouteBuilderSupport {
//Создаем Camel Main класс и контекст в нем
val mainApp = new Main
  val context = mainApp.getOrCreateCamelContext
  // Привязываем классы с маршрутами
  mainApp.addRouteBuilder(new FromFileToFileRoute(context))
  // Запускаем
  mainApp.run
}

class FromFileToFileRoute(context: CamelContext) extends ScalaRouteBuilder(context) {
  // Читаем содержимое файла в одной кодировке из папки "inbox"
  """file:inbox?charset=utf-8""" ==> {
    // Пишем в другой кодировке в директорию "outbox"
    to ("file:outbox?charset=Windows-1251")
  }
}

В классе FromFileToFileRoute не происходит никаких преобразований с содержимым сообщения, отсутствует маршрутизация. После запуска приложения в папке проекта будут автоматически созданы папки "inbox", "outbox". При попадании в директорию "inbox", файл автоматически считывается — исчезает из папки. Затем он появляется в директории "outbox" в другой кодировке. При этом в папке "inbox" в отдельной подпапке будут храниться сообщения, прочитанные Сamel.


Пример 2
Запрос к веб-сервису по расписанию и сохранение сообщения в хранилище данных. В этом примере по таймеру будем собирать даные о курсе валют и отправлять в Redis. Для того, чтобы выпонить действия над сообщением (записать тело и заголовки), существует метод "process". Для Redis отправка значений производится с помощью пары заголовков "CamelRedis.Key"/"CamelRedis.Value". Нам необходимо извлечь тело сообщения, которое возвращает HTTP GET запрос и сделать его заголовком "CamelRedis.Value".


Ключ будем генерировать уникальный, подходящий для сортировки — текущее время в миллисекундах.


import org.apache.camel.{Exchange, CamelContext}
import org.apache.camel.main.Main
import org.apache.camel.scala.dsl.builder.{ScalaRouteBuilder, RouteBuilderSupport}
import org.springframework.data.redis.serializer.StringRedisSerializer

object FromHTTPToRedisApp extends App with RouteBuilderSupport{
  val mainApp = new Main
  // Прописываем вместо стандартного кастомный stringSerializer для Redis
  mainApp.bind("stringSerializer",new StringRedisSerializer)
  val context = mainApp.getOrCreateCamelContext
  mainApp.addRouteBuilder(new FromHTTPToRedisRoute(context))
  mainApp.run
}

class FromHTTPToRedisRoute (context: CamelContext) extends ScalaRouteBuilder(context) {
  // По таймеру, раз в минуту обращаемся к HTTP сервису
  """quartz:timerName?cron=0+0/1+*+*+*+?""" ==> {

    // Вывод информации в консоль
    log("Запрос к сервису")
    // Запрос к сервису
    to("http://www.google.com/finance/info?q=CURRENCY%3aUSDRUB")
    // Создание пары ключ-значение для Кedis, запись в заголовок
    process((exchange: Exchange) => {
      exchange.getOut.setHeader("CamelRedis.Key",System.currentTimeMillis())
      exchange.getOut.setHeader("CamelRedis.Value",exchange.getIn.getBody(classOf[String]))
    })
    // Логгирование через отправку в конечную точку позволяет просмотреть сообщение и его атрибуты
    // В данном случае тело сообщения будет пусто (Body: [Body is null]])
    to("log:FromHTTPToRedisApp")
    // Отправляем данные в Redis
    // #stringSerializer - объявленный нами ранее кастомный сериалайзер
    to("""spring-redis://172.16.7.58:6379?serializer=#stringSerializer""")
  }
}

Чтобы писать в Redis с удаленного хоста, может понадобиться разрешение. К примеру, в консоли Redis на хосте, где он запущен, выполнить команду


CONFIG SET protected-mode no

Пример отображения записей в Redis представлен на рисунке.


image


Пример 3
Реализация веб-сервиса, который возвращает сообщение в зависимости от параметра GET запроса. В данном примере с помощью компонента Jetty реализуем простой HTTP сервер, который получает GET запрос с параметром и возвращает xml со значением параметра, либо с ошибкой.


object JettyApp extends App with RouteBuilderSupport{ 
  val mainApp = new Main
  val context = mainApp.getOrCreateCamelContext
  mainApp.addRouteBuilder(new JettyRoute(context))
  mainApp.run
}

class JettyRoute(context: CamelContext) extends ScalaRouteBuilder(context) {
  // Определяем порт и адрес сервиса
  """jetty:http://0.0.0.0:1234/myapp/myservice""" ==> {
    delay(2 seconds)
    process((exchange: Exchange) => {      
      // Извлекаем значение параметра uuid из get запроса к сервису
      val uuidParam = exchange.getIn.getHeader("uuid")
      // Определяем паттерн для параметра
      val pattern = """[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}""".r
      // Возвращаем ответ в зависимости от извлеченного значения
      // Возможен случай отсутсвия параметра, а также несоответствия значения шаблону uuid
      def responseText = uuidParam match {
        case null => "Uuid parameter not found"
        case pattern() => s"$uuidParam"
        case _ => s"Uuid parameter format is not valid"
      }
      // Определяем тип возвращаемого контента как xml
      exchange.getOut().setHeader(Exchange.CONTENT_TYPE,"text/xml; charset=utf-8")
      // Возвращаем xml с ответом.
      exchange.getOut().setBody(<uuid>{responseText}</uuid>)
      // Вариант отправки параметра как строки s"<uuid>$responseText</uuid>" тоже рабочий
    })
  }
}

Примеры запросов для проверки:
» http://localhost:1234/myapp/myservice?uuid=2a577d52-e5a1-4da5-96e5-bdba1f68e6f1;
» http://localhost:1234/myapp/myservice?uuid=123;
» http://localhost:1234/myapp/myservice;
» http://localhost:1234/myapp/myservice?guid=2a577d52-e5a1-4da5-96e5-bdba1f68e6f.


Примеры ответов сервиса представлены на рисунке


image


Пример 4
Чтение сообщения из очереди и запись в БД. Работа с очередями и БД была выделена в отдельный пример. Настройка этих компонентов требует иного подхода. Если в предыдущих примерах настройка проводилась с помощью параметров в строке endpoint, то здесь нужно заранее создать объект, сделать на его основе компонент и использовать далее.


Для БД создаем экземпляр класса org.apache.commons.dbcp2.BasicDataSource и передаем ему параметры подключения. Для очереди создаем экземпляр класса javax.jms.ConnectionFactory, в котором также сохраняем параметры подключения. Далее для этих компонентов создается имя для конечной точки, и используется в URI. Разница в том, что для БД используется компонент "camel-jdbc", а для очередей создается новый компонент на основе "camel-jms".


Таблица, в которую происходит вставка записи в примерах, создается следующим запросом:


CREATE TABLE MESSAGETABLE(
 ID UUID NOT NULL PRIMARY KEY,
 DATETIME TIMESTAMP,
 BODY VARCHAR(65536)

Следующий код будет забирать сообщения из очереди, выполнять в БД запрос на добавление уникального идентификатора, времени и тела сообщения.


import java.text.SimpleDateFormat
import java.util.{UUID, Date}
import org.apache.camel.component.jms.JmsComponent
import org.apache.camel.main.Main
import org.apache.camel.scala.dsl.builder.{RouteBuilderSupport, ScalaRouteBuilder}
import org.apache.camel.{CamelContext, Exchange}
// Для создания подключения к БД импортируем BasicDataSource
import org.apache.commons.dbcp2.BasicDataSource
// Для работы с месседж-брокером импортируем соответствующий ConnectionFactory класс
import org.apache.activemq.ActiveMQConnectionFactory

object FromMQToDBApp extends App with RouteBuilderSupport {
  val mainApp = new Main
  // Для работы с БД создаем объект и передаем ему свойства соединения
  val ds = new BasicDataSource
  ds.setDriverClassName("org.h2.Driver")
  ds.setUrl("jdbc:h2:./h2db")
  // Добавляем endpoint в приложение, далее в названии получателя будем использовать "h2db"
  mainApp.bind("h2db",ds)
  // Для работы с очередью создаем MQConnectionFactory
  val cf = new ActiveMQConnectionFactory("tcp://192.168.3.38:61616")
  // Создаем компонент для работы с очередью
  mainApp.bind("amq-jms", JmsComponent.jmsComponentAutoAcknowledge(cf))
  val context = mainApp.getOrCreateCamelContext
  mainApp.addRouteBuilder(new FromMQToDBAppRoute(context))
  mainApp.run
}

 // Класс реализует чтение сообщения из очереди и запись его в БД
class FromMQToDBAppRoute(context: CamelContext) extends ScalaRouteBuilder(context) {
  // Читаем сообщение из очереди. Компонент называется также, как мы его назвали ранее - "amq-jms", имя очереди передается как параметр
  // Для каждого менеджера очередей необходимо создавать свой компонент
  """amq-jms:queue:TESTQ""" ==> {

    process((exchange: Exchange) => {
      // Генериуем uuid, дату/время
      val uuid = UUID.randomUUID
      val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
      // Извлекаем тело сообщения
      val messageBody = exchange.getIn.getBody(classOf[String])
      // Формируем запрос с параметрами
      exchange.getOut.setBody(s"INSERT INTO PUBLIC.MESSAGETABLE (ID, DATETIME, BODY) VALUES('$uuid', '$time', '$messageBody')")
    })
    // Отправляем подготовленный запрос в БД
    // Компонент называется jdbc, далее указывается конкретный DataSource
    to("jdbc:h2db")
  }
}

Следует помнить о том, что при попытке записи в БД сообщения больше длины поля (у таблицы, созданной запросом ранее, длина поля — 65536 символов) – возникнет ошибка. Ее можно решить, обрезая тело до нужного размера, либо добавив errorHandler(deadLetterChannel("file:error")), который будет отправлять сообщения, приводящие к ошибкам, в папку «error».


В примере рассмотрено взаимодействие с базой данных H2. Для других БД нужно добавить соответствующую библиотеку в build.sbt, определить имя класса драйвера, URL. Могут понадобиться другие свойства подключения, к примеру, имя пользователя и пароль.


Пример описания реквизитов подключения для работы с Postgresql:


Добавление библиотеки в build.sbt


      libraryDependencies += "org.postgresql" % "postgresql" % "9.4.1207"

Реализация в классе:


    val ds = new BasicDataSource {
      setDriverClassName("org.postgresql.Driver")
      setUrl(conf.getString("jdbc:postgresql://myhost:5432/mydb"))
      setUsername(conf.getString("myusername"))
      setPassword(conf.getString("mypassword"))
      }

С очередями несколько сложнее. Для некоторых из менеджеров очередей, библиотеки не открыты для доступа в репозиториях. В этом случае, используются *.jar файлы, которые хранятся в папке lib проекта.


Для любого менеджера очередей нужно создать соответствующий объект типа connection factory.
К примеру, код, обеспечивающий взаимодействие с IBM Websphere MQ, будет таким:


    val cf = new MQQueueConnectionFactory {
      setHostName("myhost")
      setPort(1414)
      setTransportType(1)
      setQueueManager("myqmname")
      setChannel("mychannel")
    }

Для Oracle Weblogic Jms еще интереснее. Если создать очереди по иструкции How to Create a Simple JMS Queue in Weblogic Server 11g, то объявление компонента будет таким:


  val env = new util.Hashtable[String, String]
  env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory")
  env.put(javax.naming.Context.PROVIDER_URL, "t3://myhost:7001")
  val ic: InitialContext = new InitialContext(env)
  val connectionFactory = ic.lookup("jms/TestConnectionFactory").asInstanceOf[QueueConnectionFactory]
  // где jms/TestConnectionFactory - jndi для ConnectionFactory"
  mainApp.bind("ora-jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))

а endpoint URI будет такого формата: "ora-jms:queue:./TestJMSModule!TestJMSQueue", где ./ обозначает текущий сервер, "TestJMSModule" JNDI имя модуля "TestJMSQueue" — JNDI имя очереди


Пример 5
Маршрутизация по содержимому файла. В данном примерерассмотрим маршрутизацию сообщения в зависимости от его содержимого.


Предположим, что на входе имеется xml-сообщение, обработка которого зависит от значения элемента "То".


<To>ActiveMQ</To> — нужно отправить в очередь, а <To>H2</To> — обработать каким-то образом и отправить в БД, <To>someAdress</To> — обработать еще каким-то образом.


В сообщение будет добавлен заголовок "Destination" с именем конечной точки, в которую надо будет отправить сообщение.


Если возникнет ошибка при обработке сообщения или в таблице маршрутизации не будет соответствующего значения, то отправляем сообщение в "direct:trash".


В примере используется конструкция скала "???", которая позволяет заменить несуществующий блок кода для успешной компиляции. Вместо этого блока нужно написать логику обработки.


import org.apache.camel.{Exchange, CamelContext}
import org.apache.camel.scala.dsl.builder.ScalaRouteBuilder
import scala.xml.XML

class ContentOrientedRouting(context: CamelContext) extends ScalaRouteBuilder(context) {

  // При ошибках обработки сообщения, отправляем его в "direct:trash"
  errorHandler(deadLetterChannel("direct:trash"))

  // Опишем таблицу маршрутизации в виде Map
  val destMap = Map(
    "ActiveMQ"    -> "jms-amq:queue:inbox",
    "H2"               -> "direct:h2db",
    "someAdress"  -> "direct:outbox")
  // Вынесем обработку в отдельную функцию
  val addRoutingAction = (exchange: Exchange) => {
    // Получим значение тега "To" из XML-файла, который пришел на вход
    val body = exchange.getIn.getBody(classOf[String])
    val xmlBody = XML.loadString(body)
    val toValue = (xmlBody \\ "To").text
    // Получим имя endpoint, если такого значения нет - отправляем в "direct:trash"
    val dest = destMap.getOrElse(toValue,"direct:trash")
    // Устанавливаем значение заголовка
    exchange.getOut.setHeader("Destination", dest)
  }

  """direct:inbox1""" ==> {
    process(addRoutingAction)
    // извлекаем из заголовка "Destination" endpoint и отправляем туда сообщение
    recipients(_.in("Destination"))
  }
  // Описываем логику для разных endpoint
  """jms-amq:queue:inbox""" ==> {???}

  """direct:h2db""" ==> {
    process((exchange: Exchange) => {???})
    to ("jdbc:h2db")
  }

"""direct:outbox""" ==> {
  // Параллельная отправка сообщения в файл и в лог
  to("file:someFile", "log:Somelog")
}

  """direct:trash""" ==> {???}
}

Примеры показывают, как можно реализовать небольшое приложение для наших целей. Рассмотрим дополнительные аспекты. позволяющие сделать разработку и обслуживание приложения удобнее.


Для конфигурации приложения используем бибилитеку от Typesafe, чтобы не зашивать параметры подключения в коде, а хранить в конфигурационном файле.


В build.sbt добавляем:


libraryDependencies += "com.typesafe" % "config" % "1.3.0"

в папке src/main/resources создаем файл application.conf, в котором прописываем настройки и вызываем их из кода.


Запуск приложения выполняется командой sbt run. В некоторых случаях это может быть неудобно.
Возможно создание jar-файла с помощью плагина sbt-assembly https://github.com/sbt/sbt-assembly для запуска командой java –jar camelapp.jar. В .jar-файле будут содержаться все зависимости, поэтому размер будет большой, но запуск происходит сразу, без скачивания компонентов.


Для запуска в фоне удобно использовать приложение nohup.


Создаем скрипт для запуска в папке, которая входит в переменную среды $PATH, чтобы вызывать по имени из любой директории. К примеру в /usr/local/bin/. Скрипт для запуска:


/usr/local/bin/camelstart


#!/bin/bash
/usr/bin/nohup java -jar /opt/camelapp.jar&

Для остановки:
/usr/local/bin/camelstop


#!/bin/bash
pkill -f camelapp

Запуск приложения делается командой camelstart, остановка — camelstop.


Можем выделить некоторые плюсы и минусы использования Apache Camel.
Плюсы:


  • Быстрая реализация приложений;
  • Большое количество готовых компонентов;
  • Многопоточность, параллельная обработка сообщений из коробки;
  • Возможность выбора способа описания в виде XML или одного из DSL;

Минусы:


  • У каждого компонента своя логика работы, требуется время на понимание;
  • Существует порог входа;

Кроме того, поскольку Apache Camel работает на JVM, приложениям, созданным на его основе, присущи плюсы и минусы этой платформы.


Опыт использования Apache Camel в связке со ScalaDSL в нашей компании показал его эффективность для создания заглушек, компонентов интеграции а иногда и нагрузочных тестов.

Поделиться с друзьями
-->

Комментарии (3)


  1. beTrue
    31.08.2016 18:14

    Уважаемый автор, подскажите плиз, в чем конкретно здесь профит от использования AC, всё необходимое для EIP идет из коробки, которую нужно освоить? т.е. не требуется кодить различную агрегацию, а достаточно взять её из AC и связать через DSL, верно?
    имхо, для таких задач дешевле использовать акторы, т.к. все умеют в аккторы кто пишет на scala, интересно, есть ли у вас опыт использования акторов + AC?


  1. sshikov
    31.08.2016 19:20

    Уважаемый автор, подскажите плиз, в чем конкретно здесь профит от использования AC, всё необходимое для EIP идет из коробки, которую нужно освоить? т.е. не требуется кодить различную агрегацию, а достаточно взять её из AC и связать через DSL, верно?

    Я не автор, но да — верно. У нас много Camel в проектах, впечатления за несколько лет более чем положительные.


    для таких задач дешевле использовать акторы,

    Ну это несколько спорное утверждение. Может все и умеют акторы, но функциональность Camel и акторов не идентична. Есть еще развесистый набор endpoint, пусть он не всегда идеален, но покрывает множество типичных задач.


  1. S_T_I_M
    01.09.2016 12:24

    Спасибо за статью, хотелось бы запустить все примеры, есть ссылки на гитхаб?