Подкинули задачу сделать микросервис, который получает данные от RabbitMQ, обрабатывает, и отправляет данные дальше по этапу в RabbitMQ. После отправки задания, я посмотрел на то что поучилось. Оказалось, что этот набор компонентов можно использовать для быстрого прототипирования pipeline архитектуры


Используемые компоненты:



Для примера буду делать микросервис для выдачи рейтинга игроков. От ядра системы в микросервис приходят следующие сообщения:


  • player_registered(id,name);
  • player_renamed(id,name);
  • player_won(id, points).

Сервис раз в минуту должен отсылать сообщение с содержимым рейтинга.Рейтинг сортируется по набранным очкам за календарную неделю.


REACT-CPP


REACT-CPP — это обертка над libev на C++11. Эта библиотека нужна для организации цикла обработка событий(event loop). Т.к. кроме работы с сокетом потребуются таймеры и обработчики unix сигналов.


class Application
{
public:

    Application();
    ~Application();

    using IntervalWatcherPtr = std::shared_ptr<React::IntervalWatcher>;

    void run();
    void shutdown();
    //...

private:

    bool onMinute();
    //...    

private:

    React::MainLoop m_loop;
    IntervalWatcherPtr m_minuteTimer;
    //...
};

void Application::run()
{
    m_minuteTimer = m_loop.onInterval(5.0, 60.0, std::bind(&Application::onMinute, this));

    m_loop.onSignal(SIGTERM, [this]() -> bool
    {
        shutdown();
        return false;
    });

    m_loop.onSignal(SIGUSR1, [this]()->bool{
        cleanRating();
        return true;
    });

    //...
    m_loop.run();
}

bool Application::onMinute()
{
    calculateRating();
    sendRating();
    return true;
}

Тут создаю таймер который стартует через 5 секунд и который будет вызывать обработчик каждые 60 секунд. Любой приличный демон/сервис должен иметь обработчик SIGTERM, что бы из вне попросить его корректно завершится. Что касается обработчика SIGUSR1 тут можно самостоятельно вычислять начало/конец недели через Boost.Date_Time, но мне тупо лень, когда в GNU/Linux есть cron+pkill.


AMQP-CPP


С тех пор как опубликовал RabbitMQ tutorials на C++ AMQP-CPP обзавелась реализацией обработчика на libev и libuv.


Подключение и обработка сообщения:


void Application::createChannel(AMQP::TcpConnection &connection)
{
    m_channel = std::make_unique<AMQP::TcpChannel>(&connection);

    m_channel->declareQueue(m_cfg.source().name, AMQP::durable)
        .onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount)
                   {
                       LOG(INFO) << "Declared queue "
                                 << name
                                 << ", message count: "
                                 << messagecount;

                       m_channel->consume(m_cfg.source().name)
                           .onReceived([&](const AMQP::Message &message,
                                           uint64_t deliveryTag,
                                           bool redelivered)
                                       {
                                           onMessage(message, deliveryTag, redelivered);
                                       })
                           .onError([](const char *message)
                                    {
                                        LOG(ERROR) << "Error consume:" << message;
                                        APP->shutdown();
                                    });
                   })
        .onError([&](const char *message)
                 {
                     LOG(ERROR) << "Error declare queue:" << message;
                     shutdown();
                 });
}

void Application::onMessage(const AMQP::Message &message,
                            uint64_t deliveryTag,
                            bool redelivered)
{
    parseMessage(message);
    m_channel->ack(deliveryTag);
}

Публикация сообщения:


AMQP::Envelope env(s.GetString());

m_channel->publish("", m_cfg.destination().name, env);

LevelDB


Может потребоваться локальное хранилище данных. Взял LelevDB, я о нем писал в Использование LevelDB. Сделал лишь небольшую RAII обертку:


Код обертки
class DataBase
{
public:

    DataBase();

    bool open(const std::string &path2base, bool compression = true);

    bool put(const std::string &key, const ByteArray &value, bool sync = false);
    ByteArray get(const std::string &key);

    Snapshot snapshot();

    Iterator iterator();

private:

    std::shared_ptr<leveldb::DB> m_backend;
};

class Snapshot
{
public:

    Snapshot();

    ~Snapshot();

    ByteArray get(const std::string &key);

    Iterator iterator();

private:

    Snapshot(const std::weak_ptr<leveldb::DB> &backend, const leveldb::Snapshot *snapshot);

private:

    friend class DataBase;

    std::weak_ptr<leveldb::DB> m_backend;
    const leveldb::Snapshot *m_shapshot;
};

class Iterator
{
public:

    Iterator(std::unique_ptr<leveldb::Iterator> rawIterator);
    Iterator(Iterator &&iter);

    /*!
     * Create empty iterator
     */
    Iterator() = default;

    ~Iterator();

    bool isValid() const noexcept;

    void next();

    void prev();

    std::string key();
    ByteArray value();

    /*!
     * Seek to first
     */
    void toFirst();

    /*!
     * Seek to last
     */
    void toLast();

    Iterator(const Iterator &) = delete;
    Iterator &operator=(const Iterator &) = delete;

private:

    std::unique_ptr<leveldb::Iterator> m_iterator;
};

LevelDB используется для сохранения/востановления состояния.


void Application::loadFromLocalStorage()
{
    auto snapshot = m_localStorage->snapshot();
    auto iter = snapshot.iterator();
    iter.toFirst();
    while (iter.isValid()) {
        auto player = new Player(iter.value());
        m_id2player[player->id] = player;
        m_players.push_back(player);
        iter.next();
    }
}

void Application::updatePlayerInBD(const Player *player)
{
    if (!m_localStorage->put(std::to_string(player->id), player->serialize())) {
        LOG(ERROR) << "[" << player->id << ", "
                   << player->name
                   << "] is not updated in the database";
    }
}

Логика сервиса


Данные приходят в формате JSON. Разбирает json используя RapidJSON, ищу подходящий метод, вызываю нужный обработчик:


void Application::parseMessage(const AMQP::Message &message)
{
    /*
     * Схемка имеет вид
     * {
     *   "method":"player_registered",
     *   "params":{
     *   ...
     *   }
     * }
     */
    rapidjson::Document doc;
    doc.Parse(message.body(), message.bodySize());

    const std::string method = doc["method"].GetString();
    auto iter = m_handlers.find(method);
    if (iter != m_handlers.end()) {
        iter->second(*this, doc["params"]);
    }
    else {
        LOG(WARNING) << "Unknown method:" << method;
    }
}

Сами методы простые:


void Application::onPlayerRegistered(const JValue &params)
{
    auto obj = params.GetObject();
    const uint64_t playerId = obj["id"].GetUint64();
    if (!isRegistred(playerId)) {
        auto player = new Player;
        player->id = playerId;
        player->name = obj["name"].GetString();
        m_players.push_back(player);
        m_id2player[playerId] = player;
        updatePlayerInBD(player);
    }
}

void Application::onPlayerRenamed(const JValue &params)
{
    auto obj = params.GetObject();
    const uint64_t playerId = obj["id"].GetUint64();
    if (isRegistred(playerId)) {
        auto player = m_id2player[playerId];
        player->name = obj["name"].GetString();
        updatePlayerInBD(player);
    }
    else {
        LOG(WARNING) << "Renaming an unknown user[" << playerId << "]";
    }
}

void Application::onPlayerWon(const JValue &params)
{
    auto obj = params.GetObject();
    const uint64_t playerId = obj["id"].GetUint64();
    if (isRegistred(playerId)) {
        auto player = m_id2player[playerId];
        player->points += obj["points"].GetInt64();
        updatePlayerInBD(player);
    }
    else {
        LOG(WARNING) << "Unknown player[" << playerId << "]";
    }
}

Раз в минуту сортируем игроков и отправляем рейтинг:


bool Application::onMinute()
{
    calculateRating();
    sendRating();
    return true;
}

void Application::calculateRating()
{
    std::sort(m_players.begin(), m_players.end(), [](const Player *a, const Player *b)
    {
        return a->points > b->points;
    });
}

void Application::sendRating()
{
    using namespace rapidjson;

    StringBuffer s;
    Writer<StringBuffer> writer(s);
    writer.StartArray();

    const size_t count = std::min(m_players.size(), size_t(10));
    for (size_t i = 0;
         i < count;
         ++i) {
        writer.StartObject();

        writer.Key("id");
        writer.Uint64(m_players[i]->id);

        writer.Key("name");
        writer.String(m_players[i]->name.c_str());

        writer.Key("points");
        writer.Int64(m_players[i]->points);

        writer.EndObject();
    }

    writer.EndArray();
    AMQP::Envelope env(s.GetString());

    m_channel->publish("", m_cfg.destination().name, env);
}

Весь код доступен на GitHub'e. Исходники библиотек поставляются вместе с сервисом и собираются автоматически на GNU/Linux с gcc.


Подведем итоги, что имеем:


  • event loop с таймерами, обработчиками сигналов и всеми остальными плюшками libev;
  • работа с RabbitMQ;
  • встроенное key-value хранилище;
  • поддержка json.
Поделиться с друзьями
-->

Комментарии (8)


  1. monah_tuk
    18.11.2016 13:40

    Сырые указатели… Я бы, пожалуй, хранил бы по значению в каком-нить векторе, для представления сортированного списка использовал бы подход "container of references": http://www.cplusplus.com/reference/functional/reference_wrapper. Да, по сути будет две коллекции: одна хранит данные, а вторая позволяет изменять их представление: сортировка, фильтрация и т.п. Плюс данные хранятся максимально монолитным блоком.


    Хотя… Для такой структуры можно было бы посмотреть на хранение вообще по значению и заниматься оптимизациями когда это реально потребуется. Если потребуется.


    1. Bas1l
      18.11.2016 14:09

      А чем вам сырые указатели не угодили? Я заметил их только в передаче в функцию объектов Player (ну и char*). А если у объекта единственный owner и вы не передаете ownership в функцию, то вы не то что можете, вы обязаны передать сырой указатель/ссылку в эту функцию. Где-то у Herb Sutter табличка была про это.


      1. RPG18
        18.11.2016 14:32
        +1

        Да. Это еще отмечено C++ Core Guidelines. Возможно monah_tuk не нравится писать отдельный метод освобождения памяти.


        1. monah_tuk
          18.11.2016 16:14

          Скорее это. Особенно когда контейнер нужно часто модифицировать.


          1. RPG18
            18.11.2016 16:30

            Это абстрактная задачка, которую придумал для демонстрации.


            1. monah_tuk
              18.11.2016 17:05
              +1

              Это понятно. Просто триггер сработал. Я уже не раз сталкивался с косяками связанными с хранением сырых указателей в контейнерах, а потом: забыли удалить, удалили в другом месте, удалили, а в контейнере оставили. А на всяких RTOS второе и третье ещё и не сразу выстрелить может.


              Поэтому руководствуюсь простым правилом (и коллегам его продвигаю): можно обойтись без сырого указателя — обходись. Ссылки, набор умных указателей, контейнеры — инструментов для этого много в современном стандарте. Пока не подводило.


      1. monah_tuk
        18.11.2016 16:54
        +2

        Меня не передача озадачила, а хранение в m_players. По сути, там можно вполне себе хранить std::unique_ptr<Player>. Плюс эти же указатели передаются в m_id2player и потом через него же уничтожаются в freeMemPlayers(), при этом, внимание, не очищая их в m_players, я понимаю, что функция вызывается в текущем виде только из деструктора, который, в данный момент, вызывается только при остановке приложения, но, как мне кажется, тут как раз передача владения по сырому указателю. Да и собственно не понятно, где владелец: m_players или m_id2player.


        Ну и вот моё предложение по этому моменту: https://github.com/RPG-18/SimpleMicroservice/pull/1


        1. RPG18
          18.11.2016 17:29

          Спасибо принял.