Так случилось, что на одном проекте потребовалось реформировать способ обмена данными между различными процессами. Исторически сложившаяся схема была довольно неприглядна. Один процесс периодически перезаписывал свои текущие настройки в виде XML-файла. Второй вычитывал этот файл раз в секунду, проверяя, что в нём поменялось с прошлого раза. Изменения файла вычислялись через множество сравнений текущего и прошлого его состояний, порождая некоторую цепочку действий. Читающий процесс писал в свою очередь другой XML-файл, который читался третьим процессом и т.п. Самое печальное то, что данная схема требовала громоздкого, из раза в раз повторяющегося кода сравнений, который наслаивался при добавлении новых данных.

Была предложена идея замены всего этого зоопарка XML-файлов на систему обмена сообщениями, поддерживающую pub/sub. Активно рассматривались три кандидатуры: NATS, Redis и ZeroMQ. Поскольку планировалось обмениваться не только метаданными, но и большим объёмом бинарных данных в реальном времени, во краю угла стала максимальная пропускная способность. По этой причине пришлось отсеять первые два кандидата, несмотря на их более высокоуровневый и удобный broker-based API (тесты показали, что NATS даёт фору Redis, но где-то на 20% проигрывает ZeroMQ).

Далее стал вопрос о способе синхронизации состояния между процессами. Логичной показалась следующая схема:

  1. Клиенты после подключения к серверу вычитывают его полное состояние.
  2. Далее при изменении состояния сервер публикует патчи (изменения), на которые подписаны клиенты.
  3. При получении патча клиент вызывает обработчики, соответствующие изменениям (событиям) в патче, а затем накладывает его на предыдущее состояние сервера.

В эту схему прекрасно укладывалось использование JSON Patch, что позволило не изобретать велосипед для генерации и наложения патчей. Таким образом, библиотека JSON, имеющая встроенную поддержку JSON Patch, стала идеальной основой для нашей библиотеки для синхронизации состояния.


Итак, после пары недель работы была написана небольшая библиотека, включавшая в себя следующие коммуникационные примитивы:


  1. Publisher — простая обёртка над PUB-сокетом.
  2. Subscriber — обёртка над SUB-сокетом, позволяющая асинхронно обрабатывать нотификации в выделенном потоке.
  3. Requester — обёртка над REQ-советом, позволяющая асинхронно отправить запрос и обработать ответ в выделенном потоке.
  4. Replier — обёртка над REP-сокетом, позволяющая обрабатывать входящие запросы в выделенном потоке.

На основе этих примитивов были реализованы Client и Server,  позволяющие синхронизировать состояние, а также назначать callbacks на конкретные его изменения.


Пример кода и его вывод
#include <chrono>
#include <map>
#include <string>
#include <vector>

#include "syncer.h"

using namespace nlohmann;
using namespace std;
using namespace std::chrono;
using namespace syncer;

struct Site {
  int temperature;
  int pressure;
};

static inline void to_json(json& j, const Site& s) {
  j = json();
  j["temperature"] = s.temperature;
  j["pressure"] = s.pressure;
}

static inline void from_json(const json& j, Site& s) {
  s.temperature = j.at("temperature").get<int>();
  s.pressure = j.at("pressure").get<int>();
}

struct State {
  map<string, Site> sites;
  string forecast;
};

static inline void to_json(json& j, const State& s) {
  j = json();
  j["sites"] = s.sites;
  j["forecast"] = s.forecast;
}

static inline void from_json(const json& j, State& s) {
  s.sites = j.at("sites").get<map<string, Site>>();
  s.forecast = j.at("forecast").get<string>();
}

PatchOpRouter<State> CreateRouter() {
  PatchOpRouter<State> router;

  router.AddCallback<int>(R"(/sites/(\w+)/temperature)", PATCH_OP_ANY,
    [] (const State& old, const smatch& m, PatchOp op, int t) {
      cout << "Temperature in " << m[1].str() << " has changed: "
           << old.sites.at(m[1].str()).temperature << " -> " << t << endl;
    });

  router.AddCallback<Site>(R"(/sites/(\w+)$)", PATCH_OP_ADD,
    [] (const State&, const smatch& m, PatchOp op, const Site& s) {
      cout << "Site added: " << m[1].str()
           << " (temperature: " << s.temperature
           << ", pressure: " << s.pressure << ")" << endl;
    });

  router.AddCallback<Site>(R"(/sites/(\w+)$)", PATCH_OP_REMOVE,
    [] (const State&, const smatch& m, PatchOp op, const Site&) {
      cout << "Site removed: " << m[1].str() << endl;
    });

  return router;
}

int main() {
  State state;
  state.sites["forest"] = { 51, 29 };
  state.sites["lake"] = { 49, 31 };
  state.forecast = "cloudy and rainy";
  Server<State> server("tcp://*:5000", "tcp://*:5001", state);

  Client<State> client("tcp://localhost:5000",
                       "tcp://localhost:5001",
                       CreateRouter());

  this_thread::sleep_for(milliseconds(100));

  cout << "Forecast: " << client.data().forecast << endl;

  state.sites.erase("lake");
  state.sites["forest"] = { 50, 28 };
  state.sites["desert"] = { 55, 30 };
  state.forecast = "cloudy and rainy";
  server.Update(state);

  this_thread::sleep_for(milliseconds(100));

  return 0;
}

Результатом выполнения этого кода будет следующий вывод:


Site added: forest (temperature: 51, pressure: 29)
Site added: lake (temperature: 49, pressure: 31)
Forecast: cloudy and rainy
Temperature in forest has changed: 51 -> 50
Site removed: lake
Site added: desert (temperature: 55, pressure: 30)

Конечно, выбранный подход далёк от оптимальности с точки зрения производительности, поскольку щедро выделяет потоки под индивидуальные сокеты, вместо использования Epoll. Потому он будет плохо подходить для систем, требующих большого числа одновременных соединений. Будем надеяться, что для большинства случаев это некритично.


Итак, появилась возможность сильно упростить большую часть межпроцессной коммуникации. Это будет не так просто сделать для legacy-кода, поскольку ручные проверки изменений сильно перемешаны с остальной функциональностью, а потому придётся резать "по-живому". С другой стороны, реализовывать синхронизацию для нового кода стало одним удовольствием.

Комментарии (4)


  1. thatsme
    22.10.2017 18:41

    Года 2 назад делали подобный костыль для Zabbix, что-бы сократить объём данных передаваемых ч-з VSAT. Только кодить для этого не понадобилось от слова совсем: curl + JSON pretty print и стандартная утлилита diff с ключами bBwup. Первая передача после инициации, — полное состояние, а потом только патчи. Передача раз в 4-е секунды. Т.е. ни о каких серьёзных нагрузках как-бы речи не шло, но ~400 станций присылали данные на центральный сервер.
    Ах да… транспорт netcat с постоянно открытым соединением. Сеть приватная, так что даже никакого SSL. Ну кроме того шифрования которое в канале провайдера VSAT. Костыль, костылём, но накладные расходы минимальные.


  1. oYASo
    22.10.2017 22:20

    Второй вычитывал этот файл раз в секунду, проверяя, что в нём поменялось с прошлого раза.

    Можно было заюзать inotify (или аналоги на других системах) для удобного асинхронного информирования об изменениях в файлах. Ну и легким движением смотреть diff'ы.


  1. Ryppka
    23.10.2017 07:10

    А зачем в Вашем примере объявления функций static inline? В C — понятно, а в плюсах?


  1. Dorzho
    24.10.2017 08:56
    +1

    синхронизации состояния фотка первая в тему))