В предыдущей статье мы использовали приложение MiniFIX для подключения и отправки сообщений на тестовую биржу с помощью протокола FIX. В этой статье напишем собственную реализацию клиента для получения рыночных данных в виде небольшого SpringBoot-приложения. Код доступен в репозитории.


Для реализации приложения нам понадобится:


  • Java 8
  • Maven
  • Spring boot 2.2.5
  • Lombok
  • QuickFix/J

Содержание для упрощения навигации по статье:



FIX-Engine и запуск тестового сервера


FIX-Engine, или FIX-движок, обеспечивает связь со сторонними системами по протоколу FIX. Он отвечает за преобразование данных в FIX-сообщения, а также за создание сессии и обеспечение ее работы: проверку валидности сообщений, генерацию контрольных сумм, восстановление работы после потери связи и т.д (здесь можно почитать более подробно).


В нашем случае в роли такого движка выступает QuickFix/J. В предыдущей части я использовала пример Executor из модуля examples, но в нем обрабатываются только сообщения на создание торговых заявок. В этом же модуле есть более подходящий пример — OrderMatch (quickfixj-examples-ordermatch), в нем помимо поддержки торговых заявок присутствует обработка сообщений на получение рыночных данных (MarketDataRequest).


Когда вы первый раз клонируете репозиторий, обязательно нужно выполнить сборку проекта, чтобы сгенерировались FIX-сообщения для различных версий протокола. В Readme проекта есть описание команд для различных видов сборки (с тестами и без), самый быстрый:


mvn clean package -Dmaven.javadoc.skip=true -DskipTests -PskipBundlePlugin

Процесс сборки длился у меня где-то минут 6-7, так что в это время можно заварить себе чашечку чая изучить настройки сервера и приступить к написанию клиента.


Как я уже описывала ранее, открываем файл resources/quickfix.examples.ordermatch/ordermatch.cfg, проверяем SocketAcceptPort и заполняем поле TargetCompID нужным значением для нашего клиента (можно оставить BANZAI, которое указано по умолчанию, можно написать любое другое на ваше усмотрение):


[default]
FileStorePath=target/data/ordermatch
DataDictionary=FIX42.xml
SocketAcceptPort=9876 // порт для подключения
BeginString=FIX.4.2 // версия FIX 4.2

[session]
SenderCompID=EXEC // идентификатор сервера
TargetCompID=FIX_CLIENT // идентификатор клиента
ConnectionType=acceptor
StartTime=00:00:00
EndTime=00:00:00

Если хотите поменять значение идентификатора клиента, то лучше, конечно, сделать это перед сборкой, чтобы не пришлось собирать еще раз.


Когда сборка завершится, заходим в quickfixj\quickfixj-examples\ordermatch\target, проверяем, что там появились *.jar файлы:


Запускаем файл quickfixj-examples-ordermatch-2.2.0-SNAPSHOT-standalone.jar, так как он содержит все необходимые для запуска зависимости:


java -jar quickfixj-examples-ordermatch-2.2.0-SNAPSHOT-standalone.jar


Если появилась запись "Started QFJ Message Processor" – значит, сервер запустился. Проверьте, что в строке "Listening for connections at … [FIX4.2:EXEC->FIX_CLIENT]" указано нужное значение идентификатора клиента.


Структура проекта


Вот так выглядит готовый проект (стандартная структура веб-приложений: сервисы, контроллеры, модельки и т.д):



Создаем maven-проект со стандартными зависимостями и добавляем библиотеку QuickFix/J для работы с протоколом FIX:


<properties>
   <quickfixj.version>2.0.0</quickfixj.version>
</properties>

<dependency>
   <groupId>org.quickfixj</groupId>
   <artifactId>quickfixj-core</artifactId>
   <version>${quickfixj.version}</version>
</dependency>

<dependency>
   <groupId>org.quickfixj</groupId>
   <artifactId>quickfixj-messages-fix42</artifactId>
   <version>${quickfixj.version}</version>
</dependency>

Я подключила 2 модуля: quickfixj-core и quickfixj-messages-fix42 для работы с сообщениями только версии FIX 4.2.


Если в вашем приложении предполагаются сообщения различных версий протокола, можно подключить quickfixj-core + quickfixj-messages-all или просто quickfixj-all.

Полная версия pom.xml доступна в репозитории.


Настройка параметров подключения


По аналогии с файлом настроек на сервере, создадим файл resources/config/client.cfg с настройками нашего приложения.


В файле может быть один блок [default], в котором находятся параметры, общие для всех сессий, и несколько блоков [session] для описания параметров конкретной сессии (если сервер поддерживает сообщения различных версий протокола FIX, то для каждой версии создается отдельный блок [session]).


[default]
SenderCompID=FIX_CLIENT // идентификатор отправителя
TargetCompID=EXEC // идентификатор получателя
ConnectionType=initiator // приложение является клиентом
NonStopSession=Y
SocketConnectHost=localhost
ReconnectInterval=5
HeartBtInt=30
FileStorePath=target/data/banzai
UseDataDictionary=Y
DataDictionary=dictionary/fix4_2.xml
ValidateUserDefinedFields=N
AllowUnknownMsgFields=Y
ValidateUserDefinedFields=N
AllowUnknownMsgFields=Y

[session]
BeginString=FIX.4.2
ResetOnLogon=Y

Подробнее о параметрах

Начнем с блока [default]:


  • параметры сессии
    SenderCompID, TargetCompID – идентификатор отправителя и получателя сообщений соответственно (sender – наше приложение, target – сервер). Убедитесь, что эти значения совпадают со значениями параметров на сервере.
    ConnectionType (initiator/acceptor) – указывает, является наше приложение клиентом или сервером.
    — С помощью параметров StartTime и EndTime можно указать время начала и соответственно завершения работы сессии (например, биржа работает с 9.00 до 18.00, поэтому нет смысла запускать сессию вне этого времени). Если же сессия будет работать весь день, то можно указать NonStopSession=Y, что будет равносильно варианту StartTime=00:00:00 и EndTime=00:00:00.
    -параметры валидации сообщений
    UseDataDictionary=Y – можно использовать словарь сообщений, если вы работаете с биржей, спецификация сообщений которой отличается от стандартной (например, в словаре можно указать дополнительные теги или типы сообщений). При этом использование словаря обязательно, если есть сообщения с повторяющимися группами.
    DataDictionary – путь к словарю.
  • параметры клиента
    ReconnectInterval – интервал переподключения к серверу (в секундах).
    HeartBtInt – интервал проверочных сообщений типа HeartBeat (в секундах).
    LogonTimeout, LogoutTimeout – время ожидания Logon и Logout сообщений перед отключением сессии (в секундах).
    SocketConnectHost, SocketConnectPort – хост и порт подключения к acceptor-у.
  • параметры хранения сообщений и логов
    Сообщения и логи можно хранить в файлах или в базе данных (сообщения можно нигде не хранить, если выставить параметр PersistMessages=N).
    Я указала FileStorePath=target/data/banzai для хранения сообщений и номеров последовательностей в файле. Можно указать параметры базы данных (JdbcURL, JdbcUser, JdbcPassword и т.д), тогда сообщения будут храниться в базе данных.

В настройках конкретной сессии (в блоке [session]) главное – заполнить параметр BeginString, в котором указывается версия протокола FIX, использующегося в сообщениях.


Любые настройки можно указывать непосредственно при создании подключения в коде с помощью класса SessionSettings.


Подробнее о конфигурации клиента можно почитать в официальной документации.


Создание FIX-приложения


Теперь перейдем непосредственно к коду клиента. Чтобы создать FIX-приложение, нам нужно просто реализовать интерфейс Application:


public interface Application {
  void onCreate(SessionID sessionId);
  void onLogon(SessionID sessionId);
  void onLogout(SessionID sessionId);
  void toAdmin(Message message, SessionID sessionId);
  void toApp(Message message, SessionID sessionId)
    throws DoNotSend;
  void fromAdmin(Message message, SessionID sessionId)
    throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon;
  void fromApp(Message message, SessionID sessionId)
    throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType;
}

Эти методы вызываются в результате событий, происходящих в приложении (подробнее).


Метод fromApp срабатывает при получении сообщений с сервера, то есть в нем происходит основная логика. Остальные методы в основном служебные. Для удобства я создала абстрактный базовый класс BaseFixService, который реализует служебные методы интерфейса Application, и его наследника FixClientService, который занимается обработкой сообщений с сервера и соответственно реализует метод fromApp.


В приложении может быть установлено несколько сессий, поэтому в базовом классе будем хранить все сессии:


Map<SessionID, Session> sessions = new HashMap<>();

Так как метод onCreate срабатывает при создании новой сессии, в нем будем сохранять сессию по полученному ID с помощью метода lookupSession:


@Override
public void onCreate(SessionID sessionId) {
   log.info(">> onCreate for session: {}", sessionId);
   Session session = Session.lookupSession(sessionId);
   if (session != null) {
       sessions.put(sessionId, session);
   } else {
       log.warn("Requested session is not found.");
   }
}

Когда сессия отключается от сервера (мы завершили сеанс сообщением Logout или произошли какие-то технические проблемы и связь оборвалась), мы удаляем её из нашего хранилища.


@Override
public void onLogout(SessionID sessionId) {
   log.info(">> onLogout for session: {}", sessionId);
   sessions.remove(sessionId);
}

В FixClientService у нас находится главный обработчик сообщений – метод fromApp:


@Override
public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
   try {
       String type = MessageUtils.getMessageType(message.toString()); // получение типа сообщения
       switch (type) {
           case MARKET_DATA_SNAPSHOT_FULL_REFRESH:
               log.info("MarketData message: {}", message);
               break;
           case SECURITY_DEFINITION:
               log.info("SecurityDefinition message: {}", message);
               break;
           default:
               log.info("Unhandled message {} of type: {}", message, type);
       }
   } catch (Exception ex) {
       log.debug("Unexpected exception while processing message.", ex);
   }
}

С помощью класса MessageUtils библиотеки QuickFix/J можно получить тип входящего сообщения и далее обработать каждый случай (здесь для примера я указала несколько типов сообщений и вывела их в лог). В этой статье реализуем получение рыночных данных и их сохранение в кэш, остальные типы сообщений и их обработку более подробно разберем в следующих статьях и дополним логику нашего клиента.


Создание сервиса для подключения к серверу


Когда мы создали реализацию FIX-приложения, можно приступить к сервису для подключения к серверу – ConnectorService. При запуске приложения он будет создавать и запускать сокет для обмена сообщениями.


Для обмена сообщениями нужно создать SocketInitiator (на сервере аналогично создается SocketAcceptor). При создании передаются следующие параметры:


  • Application – FIX-приложение (т.е. класс, реализующий интерфейс Application, FixClientService в нашем случае)
  • MessageStoreFactory – способ хранения сообщений, это может быть, например, JdbcStoreFactory (хранение в базе данных), MemoryStoreFactory (хранение в памяти), FileStoreFactory (хранение в файле).
  • SessionSettings – настройки сессии, для их создания нужно передать файл с настройками (либо его название, либо InputStream).
  • LogFactory – хранение логов (аналогично сообщениям это может быть FileLogFactory, JdbcLogFactory), я использовала SLF4JLogFactory.
  • MessageFactory – используется для создания сообщений (можно использовать DefaultMessageFactory или MessageFactory для конкретной версии протокола FIX).

Путь к файлу настроек и дополнительные параметры (хост и порт подключения) для удобства я вынесла в конфигурацию приложения (application.yaml):


fix:
 cfg: 'classpath:config/client.cfg'
 socketConnectHost: localhost
 socketConnectPort: 9876

Соответственно при создании настроек сессии я использую этот файл и с помощью метода sessionSettings.set(String key, String value) добавляю параметры SocketConnectHost, SocketConnectPort:


try (InputStream inputStream = config.getCfg().getInputStream()) {
   SessionSettings sessionSettings = new SessionSettings(inputStream);
   sessionSettings.setString("SocketConnectHost", config.getSocketConnectHost());
   sessionSettings.setString("SocketConnectPort", config.getSocketConnectPort());

   MessageStoreFactory storeFactory = new FileStoreFactory(sessionSettings);
   SLF4JLogFactory logFactory = new SLF4JLogFactory(sessionSettings);
   MessageFactory messageFactory = new DefaultMessageFactory();

   socketInitiator = new SocketInitiator(fixClientService, storeFactory, sessionSettings, logFactory, messageFactory);
   socketInitiator.start();
} catch (Exception ex) {
   log.error("Exception while establishing connection to FIX server.", ex);
   throw new FixClientException("Exception while establishing connection to FIX server.", ex);
}

После создания настроек сессии объявляем LogFactory, MessageFactory, MessageStoreFactory и передаем их в конструктор SocketInitiator. Вызвав метод start() запустим подключение и сможем получать сообщения.


Не забудьте закрыть сокет при завершении работы с помощью метода stop().

Отправка запроса на получение рыночных данных


Когда приложение запустится и установится соединение с сервером, мы сможем отправлять и получать сообщения. Так как взаимодействие у нас построено на сокетах, отправка сообщения и получение ответа на него происходят асинхронно. Поэтому у нас будет два контроллера:


  • для инициации отправки сообщений
  • для получения данных, сохраненных в результате обработки ответов на отправленные ранее сообщения.

Чтобы получить рыночные данные (например, цену покупки, цену продажи инструмента), нам нужно отправить сообщение-запрос на данные и соответственно обработать ответное сообщение в методе fromApp.



Напишем метод для создания сообщения типа MarketDataRequest (о тегах сообщения можно почитать в спецификации).


public static Message createMarketDataRequest(String symbol) {
}

В библиотеке QuickFix/J все сообщения представляют собой классы, поля в которых соответствуют тегам. Можно создать экземпляр класса нужного нам сообщения и с помощью метода set() заполнить теги. Теги также представляют собой классы с обязательным полем FIELD, в котором хранится соответствующее числовое значение.


Например, тег symbol=55:


public class Symbol extends StringField {
   public static final int FIELD = 55;

   // constructors
}

Стандартные теги (соответствующие спецификации конкретной версии FIX) обычно можно заполнить напрямую. Например, для сообщения MarketDataRequest (далее буду сокращенно писать MDR) определены методы


set(SubscriptionRequestType value)
set(MarketDepth value)
set(Symbol value)
// ...

Если же при работе с конкретной биржей в сообщении присутствуют дополнительные теги, их можно задать с помощью общего метода setField(int key, Field<?> field): например, setField(5020, new IntField(10)) — добавим в сообщение тег <5020> со значением 10: 5020=10.


Создадим объект класса MarketDataRequest:


private static int mdReqID = 1;

MarketDataRequest marketDataRequest = new MarketDataRequest(
       new MDReqID(format("FixClient-%s", mdReqID++)),
       new SubscriptionRequestType(SNAPSHOT), //263
       new MarketDepth(1) //264, 1 = top of book
);

Подробнее о параметрах в конструкторе

В конструкторе передается три параметра:


  • MDReqID – уникальный в рамках данной сессии идентификатор сообщения.
  • SubscriptionRequestType:
    SNAPSHOT = '0' (текущие данные);
    SNAPSHOT_PLUS_UPDATES = '1' (текущие данные + подписка на обновление данных; при выборе этого типа каждый раз при изменении рыночных данных для инструмента, сервер будет отправлять сообщение типа MarketDataSnapshotFullRefresh с новыми данными);
    DISABLE_PREVIOUS_SNAPSHOT_PLUS_UPDATE_REQUEST = '2' (отписка от получения данных + получение обновленных данных);
  • MarketDepth – глубина рынка для типа SNAPSHOT (цены формируются исходя из размещенных и ожидающих размещения заявок на покупку и продажу инструмента, эти заявки записываются в “книгу” заявок. Если указываем параметр равным 0 – будут учитываться все значения “книги”, если 1 – только “верхние” значения).

То же самое в виде сообщения: 262=FixClient-1 263=0 264=1.


Далее нужно указать параметры, которые мы хотим получить в результате запроса рыночных данных. Некоторые параметры в FIX-сообщениях задаются группами. При этом начинается такая часть сообщения с тега, в котором указывается количество последующих групп. В нашем случае параметр <267> NoMdEntryTypes хранит количество групп, а сами группы формируются из тегов <269> MdEntryType. Например, 269=0 означает, что мы хотим получить цену, по которой можно продать инструмент (Bid), а 269=1 – цену, по которой мы сможем купить инструмент (Ask, или Offer). Полный список стандартных значений этого тега можно посмотреть в спецификации. QuickFix/J автоматически заполняет в теге <267> количество параметров, мы можем только заполнить нужные нам поля и добавить каждую группу в сообщение:


   MarketDataRequest.NoMDEntryTypes group = new MarketDataRequest.NoMDEntryTypes(); //267

   group.set(new MDEntryType(MDEntryType.BID));
   marketDataRequest.addGroup(group);
   group.set(new MDEntryType(MDEntryType.OFFER));
   marketDataRequest.addGroup(group);

В сообщении будет выглядеть так: 267=2 269=0 269=1.


Можно делать MDR сразу для нескольких инструментов, в поле <146> NoRelatedSum передается их количество и далее заполняются группы тегов для каждого инструмента. Для простого запроса достаточно передать идентификатор инструмента в теге <55> (для более сложных запросов на фьючерсы или опционы нужно указывать дополнительные параметры, но для нашего базового случая это не нужно).


MarketDataRequest.NoRelatedSym instrument = new MarketDataRequest.NoRelatedSym();
instrument.set(new Symbol(symbol));
marketDataRequest.addGroup(instrument);

В сообщении: 146=1 55=AAPL.


Наш полученный MDR теперь можно отправить на сервер с помощью метода session.send():


@Override
public void sendMarkedDataRequest(String symbol) {
   sessions.forEach((sessionID, session) ->
           session.send(MsgUtils.createMarketDataRequest(symbol)));
}

Аналогично можно реализовать методы отправки любого другого сообщения (на создание заявки, на получение детальной информации об инструменте и т.д).


Для полученного метода отправки запроса на получение рыночных данных создадим REST endpoint, чтобы мы могли инициировать его отправку:


@PostMapping(value = "/market-data-request")
public void sendMarketDataRequest(@RequestParam("symbol") String symbol) {
   fixClientService.sendMarkedDataRequest(symbol);
}

Так будет выглядеть запрос, чтобы создать и отправить сообщение для получения данных об акциях Apple:
POST localhost:9090/fix-client/v1/market-data-request?symbol=APPL.


В результате будет отправлено сообщение:


8=FIX.4.2 9=117 35=V 34=3 49=FIX_CLIENT 52=20200601-17:10:34.103 56=EXEC 262=FixClient-1 263=0 264=1 146=1 55=AAPL 267=2 269=0 269=1 10=018


Обработка ответа и сохранение рыночных данных


Создадим отдельный сервис (MarketDataService), который будет обрабатывать рыночные данные, полученные в результате отправки запроса. Он будет сохранять полученные данные в объект, записывать их в память и отдавать при запросе по идентификатору инструмента.


Класс для хранения рыночных данных:


@Data
@Accessors(chain = true)
public class MarketDataModel {
   private String symbol;
   private BigDecimal bid;
   private BigDecimal ask;
}

Теперь нужно разобраться, как правильно обработать сообщение с данными и сохранить его.
Вот так выглядит сообщение, отправленное нам в ответ на запрос по символу AAPL:


8=FIX.4.2 9=104 35=W 34=3 49=EXEC 52=20200601-17:10:34.119 56=FIX_CLIENT 55=AAPL 262=FixClient-1 268=1 269=0 270=123.45 10=236.


Так как при запросе мы указывали группы параметров (Bid, Ask и т.д), разбирать сообщение тоже будем по группам:


message.getGroups(NoMDEntries.FIELD).forEach(group -> {
   int type = MsgUtils.getIntField(group, MDEntryType.FIELD).orElse(-1);
   BigDecimal value = MsgUtils.getDecimalField(group, MDEntryPx.FIELD).orElse(BigDecimal.ZERO);

   switch (type) {
       case 0:
           dataModel.setBid(value);
           break;
       case 1:
           dataModel.setAsk(value);
           break;
       default:
           log.warn("Invalid entry type: {}", type);
           break;
   }
});

В теге <269> хранится название параметра, а в теге <270> его значение. Соответственно, если тип параметра = 0 (т.е. Bid), то мы сохраняем значение соответствующего ему тега <270> в поле bid нашего объекта.


Далее проверяем тег <55> – идентификатор инструмента, и сохраняем по нему наши данные:


MsgUtils.getStrField(message, Symbol.FIELD).ifPresent(s -> {
   dataModel.setSymbol(s);
   marketData.put(s, dataModel);
});

Осталось только добавить сохранение данных в метод fromApp в случай обработки сообщения типа MarketDataSnapshotFullRefresh:


case MARKET_DATA_SNAPSHOT_FULL_REFRESH:
   marketDataService.saveMarketData(message);
   break;

Теперь при получении нашим приложением сообщения типа MarketDataSnapshotFullRefresh будет происходить обработка и сохранение данных в память приложения.


Соответственно в отдельный Rest-Controller добавляем метод получения данных по идентификатору:


@GetMapping
public ResponseEntity<MarketDataModel> getMarketData(@RequestParam("symbol") String symbol) {
   return new ResponseEntity<>(marketDataService.getMarketData(symbol), HttpStatus.OK);
}

Вызвав метод GET localhost:9090/fix-client/v1/market-data?symbol=AAPL
получим ответ:


{
  "symbol": "AAPL",
  "bid": 123.45,
  "ask": null
}

Запуск приложения


Наконец, можем запустить наше приложение, убедиться, что подключение к серверу осуществляется успешно, и попробовать отправить запрос на получение рыночных данных.


Если при запуске приложения в логах отображаются ошибки подключения (ConnectException), как на скриншоте ниже, проверьте, что сервер запущен и что вы указали правильные идентификаторы клиента и сервера и хост и порт для подключения:



В случае успешного запуска клиент и сервер должны обменяться Logon-сообщениями:



Отправим запрос POST localhost:9090/fix-client/v1/market-data-request?symbol=APPL, чтобы вызвать отправку сообщения MDR и убедимся, что сообщение действительно отправлено и ответ на него получен:



Бонус

Кстати, сообщения можно удобно парсить с помощью сайта – просто вставляете текст сообщения и получаете разбор по тегам и значениям:


Теперь вызвав метод GET localhost:9090/fix-client/v1/market-data?symbol=AAPL
мы должны получить ответ:


{
  "symbol": "AAPL",
  "bid": 123.45,
  "ask": null
}

Работает!


Конечно, на таком “игрушечном” примере далеко не уедешь, но для начала он хорошо подходит. Для более сложных примеров и для работы с условиями, приближенными к реальной бирже, можно получить доступ к тестовому контуру Московской биржи (MOEX) — для этого нужно оставить заявку на сайте. Я не нашла аналогичных тестовых контуров у других крупных бирж (именно для подключения напрямую через FIX-протокол), кроме симуляторов биржевой торговли, где выдаются виртуальные деньги и с помощью терминалов осуществляется торговля. Если знаете, где найти хороший тестовый сервер для работы по протоколу FIX, — поделитесь в комментариях, буду благодарна.


В следующей статье я планирую рассмотреть основные виды FIX-сообщений (соответственно дополнить приложение методами для их создания) и далее перейти к подробному рассмотрению процесса создания торговых заявок и их обработки биржей. Все примеры сообщений по-прежнему можно создавать с помощью приложения MiniFIX, если не хотите писать реализацию своего клиента.