Каждый проект рано или поздно обрастает разными технологиями, часть из которых может выполнять схожие или даже одинаковые функции. Наряду с развитием продукта это несет и скрытые трудности, в первую очередь для команды, которая должна поддерживать и развивать весь «зоопарк». 

Меня зовут Иван Банников, я ведущий разработчик VK Tech. В этом материале я расскажу об основных предпосылках разрастания используемого стека, а также на примере IoT-платформы, которую мы поддерживали, поделюсь опытом избавления от «зоопарка технологий» в области обработки сообщений.

Причины появления «зоопарка технологий»


Риск появления «зоопарка» возникает из-за того, что для каждого этапа обработки данных есть огромный выбор технологий и инструментов:



Но эта причина не основная — зачастую «зоопарк технологий» образуется не одномоментно, для этого есть предпосылки и причины, в том числе долгосрочные:

  1. Смена команд и ключевых сотрудников, утрата скрытых знаний. Когда уходят ведущие специалисты с основными компетенциями, команда часто не может продолжать использовать привычный стек: банально не хватает знаний. В результате для новых решений используют новые инструменты, одновременно продолжая изучать и использовать прежние технологии для поддержки уже работающих процессов.
  2. Внезапные и противоречивые требования бизнеса. Часто на проверку гипотез или создание MVP бизнес выделяет минимум времени. Из-за этого команды вынуждены быстро подбирать инструменты под новую фичу, часто не имея возможности допиливать уже имеющиеся решения.
  3. Обратная сторона мантры «каждой задаче — свой инструмент». «Каждой гайке — свой ключ» — универсальный подход, который гарантирует, что все задачи будут решены лучшим способом и с минимальными издержками. Но такой подход как будто требует искать специальные инструменты для каждой задачи вместо выработки универсального стека. Это и приводит к разрастанию «зоопарка технологий».
  4. Желание изучить и применить новые решения, нежелание работать с устаревшими решениями. Иногда к разрастанию стека приводит желание следовать трендам и применять новомодные технологии вместо классических. Результат очевиден: команда вынуждена одновременно поддерживать как старые, так и новые решения, поэтому бесшовный переход без появления «зоопарка технологий» невозможен.

Распределенная архитектура IoT-платформы


В архитектурах IoT-платформ выделяют несколько этапов передачи данных. В нашем случае им соответствуют разные компоненты:

  • источники данных — устройства, промышленные и корпоративные системы;
  • механизмы передачи данных — Gateway или Queue;
  • механизмы препроцессинга для конвертации данных из формата в формат, Stream Processing, сбора и систематизации данных от IoT-устройств;
  • Hot Path — приоритетный путь данных в аналитику и системы принятия решений. Нужен для отслеживания информации или предварительно обработанных показателей в консистентном виде в реальном времени или близком к нему;
  • архив — хаб для хранения потоков данных для последующего ретроспективного анализа; Также нужен для соблюдения технических, бизнесовых и законодательных требований по хранению корпоративных и персональных данных; 
  • потребители данных — системы принятия решений, инструменты виртуализации и аналитики.



На каждом из этапов можно применить широкий стек технологий: разные СУБД, ОС, брокеры очередей и десятки языков программирования.

Грабли


«Зоопарк технологий» может появиться в обработке сообщений, аналитике, хабах хранения данных или на всех этапах проекта сразу.

В нашем случае проблема локализовалась именно в обработке сообщений: IoT-платформа развивается много лет и в разное время задействовали для обработки сообщений разные инструменты:

  • RabbitMQ для выполнения задач Celery, прослушки и накапливания сообщений из базы данных (в нашем случае — PostgreSQL) и других задач;
  • Redis в качестве кэша для второй реплики Celery, а также под IoT-сервисы и другие задачи;
  • MQTT-брокер для обмена сообщениями между сервисами (Service-to-service);
  • PostgreSQL (LISTEN/NOTIFY), который использовался для обмена информацией между сервисами о любых изменениях;
  • Web-хуки для общения между сервисами, отправки уведомлений о любых изменениях внутри сервиса;
  • Graphql-подписки, предоставлемые graphql-сервером Hasura, журналы сообщений этого движка;
  • Tarantool Queue.



Наличие такого широкого стека создавало трудности:

  1. Часть инструментов «зоопарка» постепенно редуцировались. Они обслуживали всего по одной-две задачи, но все равно требовали внимания и поддержки.
  2. Управлять «зоопарком» сложно: надо знать особенности каждого инструмента, принципы работы с ними, возможности настройки и масштабирования, алгоритм действий при инцидентах.
  3. Мы страдали от ненадежности и отсутствия гарантий. Например, не знали, какие гарантии дает Redis, а MQTT-брокер вообще не предназначен для обмена сообщениями между сервисами.

Анализ проблемы


Постепенно накопление трудностей мешало бизнесу и ИТ-команде не только динамично развивать, но и поддерживать IoT-платформу. Мы понимали это, поэтому решили проанализировать используемый стек и найти артефакты, от которых можно отказаться.

Во-первых, мы изучили паттерны использования всех инструментов «зоопарка». Анализ показал, что большинство сценариев использования тех или иных решений реализует Pubsub — поведенческий шаблон проектирования, при котором одно и то же сообщение должно передаваться множеству пользователей.

Во-вторых, мы исследовали загрузку и соответствие самого стека: 

  • RabbitMQ оказался практически неиспользуемым. От него мы решили полностью уйти.
  • Redis выполнял задачи, которые мы смогли закрыть с помощью Tarantool Queue. Для этого развернули отдельную реализацию Tarantool Queue (впоследствии отказались и от нее).
  • MQTT-брокер использовался как Service-to-service-решение, в то время как его основные сценарии — Device-to-device, Device-to-service и Service-to-device. Использование его в качестве средства общения между сервисами было нерелевантно, к тому же он использовался всего в одном месте и, вероятно, был использован для такого сценария, потому что в момент разработки оказался под рукой. Его можно было исключить как средство общения между сервиса — так мы и поступили, оставив его только для работы с устройствами. 
  • PostgreSQL, web-хуки и часть функций Hasura можно было заменить шиной событий.

В результате мы пришли к выводу, что стоит применить имеющийся опыт разработки и сделать собственное решение для Pubsub под свои задачи.

От проблем к решению: разработка шины данных


На этапе разработки концепции шины событий мы выделили два основных технических требования: 

  1. Соответствие шаблону Pubsub: один поток сообщений — много потребителей, каждый со своей скоростью чтения.
  2. Обязательное хранение информации в течение некоторого времени для возможности изучить историю событий — например, при расследовании инцидентов.

В качестве прототипа мы взяли Tarantool Queue и отталкивались от решений, реализованных в нем (и на нем). На основе Tarantool Queue мы уже реализовали решение для Pubsub, но оно получилось не очень удобным:

  • для каждого потребителя создавалась отдельная очередь;
  • поскольку создавалось много очередей, одно и то же сообщение копировалось во все очереди, что в итоге приводило к большому объему;
  • Tarantool Queue спроектирован для обработки одиночных сообщений, когда потребитель забирает сообщения из очереди по одному и по одному же их подтверждает.

Проанализировав проблемы, мы решили немного изменить это решение, реализовав свой вариант очереди. Сервис получил внутреннее название Tarantool Streams. Основная идея — заменить множество очередей на одну сущность, ими стали потоки. 

Мы пришли к идее смещения с оффсетами. Есть последовательность записанных в Space данных, в котором каждое сообщение может помечаться монотонно возрастающим идентификатором или таймстемпом, которые гарантированно уникальны. Также мы реализовали отслеживание количества запрошенных, прочитанных и подтвержденных сообщений. Так получилось два смещения: 

  • чтения — подтверждает, что пользователь запросил и получил n сообщений;
  • подтверждения — верифицирует, что полученные сообщения или часть из них прочитаны.

Реализация с двумя оффсетами позволяет исключить ситуации, при которых сообщения теряются, если они получены, но не обработаны и, соответственно, не подтверждены.

В результате мы получили два Space:

  • основной, в котором записаны все сообщения;
  • дополнительный, в котором для каждого потребителя создается отдельная запись, обозначающая смещения (чтение и подтверждение). 

Дополнительно мы реализовали Fiber, который отвечает за удаление сообщений после прочтения всеми потребителями. При этом удаление можно настроить:

  • по оффсету — удаление сообщения сразу после прочтения самым медленным потребителем;
  • по TTL — удаление по заданному времени, которое позволяет исключить случайное или нежелательное удаление критических данных.

В нашем случае первым источником данных был PostgreSQL, в котором хранились пользовательские данные по приборам — и Tarantool Streams изначально был ориентирован на такую интеграцию. При этом через сервис-прослойку (в том числе самописную) к Tarantool Streams можно прикрутить и другие БД.



Замеры под нагрузкой в ходе синтетических тестов показали производительность решения до 30 000 сообщений в секунду, что полностью отвечает нашим запросам и даже превосходит их. 
В результате наш сервис позволил объединить все базы данных и скидывать все сообщения от внутренней IoT-платформы, интеграций, MQTT-брокера и других БД в общий поток событий (Event Bus). 

Как это выглядит в коде


Рассмотрим построение шины событий, работающей на одном узле Tarantool и без шардирования. Сделаем небольшую заготовку в виде файла bus.lua, которую будем расширять по мере разработки. Чтобы шина работала, необходимо сконфигурировать наш Tarantool.
Выглядит это следующим образом:

--
-- TARANTOOL CONFIGURATION
--
box.cfg {
    listen = 3301
}

Здесь мы конфигурируем наш узел Tarantool так, чтобы он слушал порт 3301. Далее в этом месте будут добавляться дополнительные настройки.

Для того чтобы реализовать поток, понадобятся спейс, в котором будут храниться записанные сообщения, и генератор последовательных идентификаторов (в качестве него будем использовать последовательности Tarantool). В спейсе с сообщениями, помимо последовательного идентификатора и самого сообщения, будем хранить еще и время добавления сообщения — это понадобится, чтобы удалять сообщения по истечении времени жизни (TTL).

--
-- SCHEMA
--
box.once('bus:stream:1', function()
    box.schema.sequence.create('stream_seq', {
        start = 0,
        min = 0,
        if_not_exists = true
    })

    local stream = box.schema.space.create('stream', {
        engine = 'memtx',
        if_not_exists = true,
        format = {
            { name = 'offset', type = 'unsigned' },
            { name = 'timestamp', type = 'unsigned' },
            { name = 'payload', type = '*' }
        }
    })
    stream:create_index('offset', {
        parts = { 'offset' },
        type = 'tree',
        if_not_exists = true,
        sequence = 'stream_seq'
    })
    stream:create_index('timestamp', {
        parts = { 'timestamp' },
        type = 'tree',
        if_not_exists = true,
        unique = false
    })
end)

Основа для потока готова. Чтобы поместить сообщение в стрим, нужно просто выполнить вставку. Код функции, которая выполняет вставку:

--
-- IMPORTS AND MODULE LOCALS
--
local clock = require 'clock'

--
-- BUS API
--
function put(payload)
    local inserted = box.space.stream:insert {
        nil,
        math.floor(clock.realtime()),
        payload
    }
    return {
        offset = inserted.offset,
        timestamp = inserted.timestamp
    }
end

Во время вставки генерируется идентификатор из последовательности, и в результате в переменной inserted мы получим кортеж с уже сгенерированным идентификатором, который можем отдать наружу, если потребуется.

Сразу отметим один момент: метод put не проводит дедубликации данных. Каждый вызов помещает новую запись в поток. Если процесс-писатель получил ошибку при вызове метода put и решил повторить операцию записи заново, так как не знает, записано ли его сообщение или он просто получил ошибку по сети, сообщение от писателя будет записано повторно. Поэтому ответственность за корректную обработку повторяющихся данных мы будем возлагать на читателя. Таким образом, гарантии для писателя можем обозначить как At least once.

Теперь, когда есть спейс с последовательными сообщениями, мы можем забирать из него данные с определенного смещения в некотором количестве. Это можно сделать с помощью такого кода:

--
-- PRIVATE FUNCTIONS
--
local function tuple_to_map(tuple)
    return tuple:tomap({ names_only = true })
end

--
-- BUS API
--
function take(offset, limit)
    return box.space.stream
        :pairs(offset, { iterator = box.index.GE })
        :take(limit)
        :map(tuple_to_map)
        :totable()
end

Таким образом, можно прочесть по частям всю ленту событий, начиная с некоторого оффсета. Такое решение уже можно начинать использовать, но в нем имеется ряд проблем, которые надо решить. Рассмотрим их.

Первая проблема — это состояние читателя. Процесс, который читает поток, может упасть и забыть, с какого смещения ему продолжать читать поток дальше. Повторная вычитка всего потока может быть долгой операцией, к тому же бессмысленной, даже несмотря на то, что на читателя мы возложили ответственность за обработку повторов.

Попробуем решить эту проблему. Предположим, что свое состояние читатель хранит в каком-то файле, рядом с собой, так что падения ему не страшны. Упал, поднялся, прочитал последний оффсет и продолжил чтение. И это решение работает, так как каждый читатель может хранить свое состояние и восстанавливать его после сбоев. Но теперь возникает другая проблема: шина событий ограничена по памяти, а следовательно, необходимо удалять данные по истечении времени жизни или после того, как их прочитали. И здесь также неясно, когда их удалять. И все еще остается актуальной проблема восстановления состояния читателя, даже несмотря на то, что он может читать из файла, а файл может быть поврежден, утерян и так далее.

Решением всех указанных проблем будет перенос состояний читателя в Tarantool. Для этого добавим еще один спейс:

--
-- SCHEMA
--
box.once('bus:consumers:1', function()
    local consumers = box.schema.space.create('consumers', {
        engine = 'memtx',
        if_not_exists = true,
        format = {
            { name = 'uuid', type = 'uuid' },
            { name = 'offset', type = 'unsigned' },
            { name = 'inflight', type = 'unsigned' },
            { name = 'limit', type = 'unsigned' }
        }
    })
    consumers:create_index('uuid', {
        parts = { 'uuid' },
        type = 'tree',
        if_not_exists = true
    })
end)

Помимо текущего смещения, также будем хранить количество прочитанных сообщений за один вызов take и максимальное количество сообщений, которое читатель может взять за один раз.

Теперь сценарий взаимодействия с потоком усложняется: читатель должен о себе как-то заявить, а именно: указать свой CID и параметры чтения потока. Сделаем это с помощью метода seek:

--
-- PRIVATE FUNCTIONS
--
local function stream_first(offset)
    return box.space.stream
        :pairs(offset, { iterator = box.index.GE })
        :nth(1)
end

--
-- BUS API
--
function seek(cid, offset, limit)
    local message = stream_first(offset)

    if message == nil then
        local seq = box.sequence.stream_seq
        local ok, result = pcall(seq.current, seq)
        if not ok then
            offset = 0
        else
            offset = result
        end
    else
        offset = message.offset
    end

    box.space.consumers:replace {
        cid,
        offset,
        offset,
        limit
    }

    return offset
end

Теперь можно зарегистрировать читателя и задать некоторую позицию, с которой начинать чтение потока, а также максимальное количество сообщений.

Приведем теперь метод take:

--
-- PRIVATE FUNCTIONS
--
local function check_consumer(cid)
    local consumer = box.space.consumers:get(cid)
    if consumer == nil then
        error('no consumer with this cid')
    end

    return consumer
end

--
-- BUS API
--
function take(cid)
    local consumer = check_consumer(cid)
    local messages = box.space.stream
        :pairs(consumer.offset, { iterator = box.index.GE })
        :take(consumer.limit)
        :map(tuple_to_map)
        :totable()

    if #messages == 0 then
        return { messages = {}, offset = nil }
    end

    local first = messages[1]
    local offset = first.offset

    box.space.consumers:update(cid, {
        { '=', 'inflight', offset + #messages },
        { '=', 'offset', offset }
    })

    return { messages = messages, offset = offset }
end

Теперь метод take зависит от состояния читателя, и уже нельзя взять и прочитать часть потока в произвольном месте. Метод take можно вызывать неоднократно, и он будет выдавать те же самые данные плюс новые сообщения, если общее количество сообщений влезает в размер пакета. Если читатель стоит в конце потока, то, неоднократно вызывая метод take, можно увидеть, как последовательно размер сообщений увеличивается, пока не достигнет заданного максимума, по мере появления данных в потоке. Данные между вызовами take будут выдаваться одни и те же.

Чтобы сместить позицию чтения потока, нужно обновить указатели в состоянии читателя. Для этого введем функцию commit:

--
-- BUS API
--
function commit(cid)
    local consumer = check_consumer(cid)
    consumer = box.space.consumers:update(
        cid,
        {{ '=', 'offset', consumer.inflight }}
    )
    return consumer.offset
end

Метод commit сдвигает позицию чтения на количество полученных ранее сообщений.

Теперь мы умеем согласованно читать из потока без потери сообщений. Возникла другая проблема. Когда читатель прочитал весь поток и дошел до конца, в конце каждый вызов take, пока не появились новые сообщения в потоке, будет выдавать пустой список. Постоянный опрос take в цикле создаст лишнюю нагрузку на сеть и на шину, поэтому необходимо сделать механизм, позволяющий заблокировать читателя до момента, пока не появятся новые сообщения. Решение простое: воспользоваться условными переменными fiber.cond. Условная переменная позволяет заблокировать файбер до тех пор, пока другой файбер не активизирует его. Для этого необходимо изменить функцию put, добавив в нее активацию условной переменной, и ввести отдельный метод wait для ожидания новых данных.

--
-- IMPORTS AND MODULE LOCALS
--
local fiber = require 'fiber'

local waiter = fiber.cond()

--
-- BUS API
--
function put(payload)
    local inserted = box.space.stream:insert {
        nil,
        math.floor(clock.realtime()),
        payload
    }
    waiter:broadcast()
    return {
        offset = inserted.offset,
        timestamp = inserted.timestamp
    }
end

function wait(cid, timeout)
    local consumer = check_consumer(cid)
    local message = stream_first(consumer.offset)
    if message ~= nil then
        return true
    end

    return waiter:wait(timeout)
end

Теперь у нас есть полный набор методов, позволяющих последовательно и без потерь читать поток, ждать новых сообщений в потоке.

Осталось решить последнюю проблему — проблему очистки потока. У этой проблемы нет однозначного решения, поскольку каждое решение требует чем-то пожертвовать. Рассмотрим вариант, когда сообщения из потока удаляются только после прочтения самым медленным читателем, то есть после наименьшего подтвержденного смещения. Это решение потенциально может привести к ситуации, когда какой-то читатель отвалился надолго (а то и навсегда), а последнее смещение так и осталось. Это грозит тем, что сообщения начнут копиться в потоке и могут заполнить всю память узла Tarantool. Другое, более жесткое решение — удалять сообщения из потока по истечении времени жизни. Это приведет к тому, что запаздывающий читатель может терять сообщения. Таким образом, необходимо находить баланс между этими проблемами. Для нашего случая в IoT-платформе актуальность данных важнее, чем потери, поэтому, если какой-то читатель запаздывает, это означает, что у него проблемы и надо с ним разбираться.

Для удаления данных воспользуемся модулем expirationd. Установим его в локальное дерево проекта:

tarantoolctl rocks install expirationd

И после этого добавим следующий код, который будет запускать задачу expirationd:

--
-- IMPORTS AND MODULE LOCALS
--
local expirationd = require 'expirationd'

local stream_ttl = 60
local stream_scan_delay = 1

--
-- PRIVATE FUNCTIONS
--
local function is_expired()
	
return true
end

--
-- TASKS
--
expirationd.start(
    'stream_expiration',
    box.space.stream.id,
    is_expired,
    {
        atomic_iteration = true,
        start_key = function()
            return math.floor(clock.realtime()) - stream_ttl
        end,
        index = 'timestamp',
        iterator_type = 'LE',
        full_scan_delay = stream_scan_delay
    }
)

Expiration создаст задачу, которая с периодичностью в 1 секунду начинает сканирование индекса timestamp у потока, в качестве ключа берет момент времени, отсчитываемый от текущего на ttl секунд. Сканирование происходит в сторону прошлого, что задается итератором GE. Функция is_expired нужна, чтобы проверить, что кортеж должен быть удален. В ней можно реализовывать различные правила — например, смягчить требования по удалению данных из потока.

Для полноты примера приведем код читателя:

--
-- BUS CONSUMER
--
local net = require 'net.box'
local uuid = require 'uuid'
local json = require 'json'
local fiber = require 'fiber'

local uri = os.getenv('BUS_URI') or '127.0.0.1:3301'

local bus = net.connect(uri)
local cid = uuid.new()

bus:call('stream_seek', { cid, 0, 10 })
while true do	
    if bus:call('stream_wait', { cid }) then
        local slice = bus:call('stream_take', { cid })
        if #slice.messages > 0 then
            print('consumer: ', cid, json.encode(slice.messages))
            bus:call('stream_commit', { cid, slice.offset })
            fiber.sleep(math.random(1, 5))
        end
    end
end

Чтобы записать что-то в шину, необходимо просто вызвать метод put с аргументами.

Приведем код простого писателя:

--
-- BUS PRODUCER
--
local net = require 'net.box'
local uuid = require 'uuid'
local json = require 'json'
local fiber = require 'fiber'

local uri = os.getenv('BUS_URI') or '127.0.0.1:3301'

local bus = net.connect(uri)

math.randomseed(os.time())
while true do
    bus:call('put', {
        {
            uuid = uuid.new(),
            ts = os.time(),
            temperature = math.random(15, 20)
        }
    })
    fiber.sleep(math.random(1, 2))
end

Запускаем шину:

tarantool bus.lua

Запускаем читателя (можно несколько в разных сессиях):

tarantool consumer.lua

Запускаем писателя:

tarantool producer.lua

Таким образом получили минимальную полноценную шину событий на Tarantool.

Почему именно Tarantool


Есть несколько причин, по котором мы предпочли Tarantool другим решениям, например RabbitMQ.

  1. Наличие у команды компетенций и собственных наработок.
  2. Высокопроизводительное In-memory-решение.
  3. Возможность писать код, который работает вместе с данными.
  4. Транзакционность в смысле ACID.
  5. Возможность реализовать структуры данных различной сложности и связи между ними. Вторичные индексы также удобны. Близко к тому, как реализуются схемы в реляционных БД.
  6. Возможность создания своих решений (очереди, Pubsub, агрегаторы, роутеры и другие) с высокой степенью адаптации под конкретные задачи.

Что получили в итоге


На первый взгляд, уходя от «зоопарка технологий», мы пришли к созданию кастомного решения, что не всегда лучше. Но в нашем случае не все однозначно: у нашей команды (не только у специалистов уровня Senior) были компетенции в Tarantool и наработки внутри IoT-платформы с использованием инструмента. Таким образом, внедрение кастомного решения Tarantool Streams позволило кратно уменьшить требования к знаниям, нужным для поддержки всей платформы.

Поскольку с внедрением Tarantool Streams практически все, что у нас было, превратилось в небольшую шину событий, мы смогли:

  • отказаться от Redis, PostgreSQL (LISTEN/NOTIFY), RabbitMQ, web-хуков и других инструментов — конфигурация сейчас содержит только Tarantool Queue, самописный Tarantool Streams и Hasura;
  • упростить создание консьюмеров;
  • снизить требования к компетенции команды;
  • сократить стоимость поддержки IoT-платформы.

Главное из нашего опыта


Наверняка каждый ИТ-специалист в своей практике сталкивался с «зоопарком технологий»: иногда это вынужденная мера, а иногда — результат бизнесовых и организационных издержек. Но разрастание «зоопарка» и игнорирование его существования неизбежно становится источником проблем для команды и бизнеса: требует много компетенций, внимания, средств. Но это не значит, что от «зоопарка» нельзя уйти. Зачастую изучения загрузки стека и паттернов использования технологий достаточно, чтобы выделить общее и найти инструмент, в том числе самописный, который может закрыть основные сценарии. В нашем случае небольшой кастомный сервис помог отказаться сразу от нескольких БД.

Скачать Tarantool можно на официальном сайте, а получить помощь — в телеграм-чате.

Комментарии (0)