Киберпанк 2077 уже наступил. Уровень развития технологий — это почти полное воплощение в реальность мечтаний писателей-фантастов. Умные устройства, «интеллектуальная» среда обитания, автоматизация ручного труда и управление техникой сквозь время и пространство. Полное торжество кибернетики, отложенного старта и дистанционного управления.

Эти «воспоминания о будущем» можно найти как у отечественных писателей, так и у зарубежных. Братья Стругацкие, Сергей Снегов, Кир Булычёв, Рей Брэдбери, Артур Кларк, Станислав Лем предсказали нам то, что мы называем Internet of Things, интернет вещей.

Технологии мотивируют бизнесменов, инженеров, изобретателей и мечтателей придумывать новые задачи и продукты. А новые продукты требуют соответствующих технологий. Этакий замкнутый круг!

Я расскажу о том, как компания Ready For Sky применила Tarantool, чтобы воплотить «воспоминания о будущем» у вас дома.

Чем мы занимаемся


Мы делаем умную бытовую технику. Слово «умная» предполагает, что техникой управляют разными способами, не только с помощью кнопок на приборах. Например, через мобильное приложение, а для любителей громко командовать есть возможность управлять чайниками и лампочками голосом.

Умные приборы могут управлять и друг другом, по сценариям и без участия человека. Срабатывает датчик открытия двери — включается кондиционер или тёплый пол. Чтобы это работало быстро и надёжно, пользователи были довольны, а разработчики (мы) не страдали — мы (разработчики) используем Tarantool. Какие задачи решает Tarantool и зачем он нам так нужен — главный вопрос, на который я и пришёл ответить.

Как всё начиналось




Есть сервис на Python3 и Tornado (назовём его Ретранслятор). Он принимает запросы от голосовых помощников Yandex, Google, Mail.ru, МТС и других. И отдаёт им ответы о состоянии приборов: включился ли чайник, стартовала ли программа мультиварки, и т.д. Ретранслятор превращает запрос, скажем, от Алисы в бинарные команды для приборов и сохраняет их в Redis. Сервис ЦУП — Центр Управления Полётами — выгребает это через Redis’овский pubsub. ЦУП написан на Java/Kotlin + RxKotlin. Он пересылает команды железякам через MQTT-брокер, по определённым топикам. И опять же через брокер принимает от них ответы и отчеты о состоянии.

У ЦУПА три основные задачи:

  • контроль за состоянием устройств;
  • управление соединениями между устройствами и шлюзами (гейтвеями), которые подключены к брокеру;
  • пересылка команд и ответов на них.

Вполне себе диспетчерская работа. Как мы видим на схеме, Ретранслятор также слушает Redis через подписку на pubsub-канал обо всех событиях. Он же считывает ответы на команды и состояние устройств, чтобы потом передать информацию голосовым сервисам в удобном для них виде.

Вроде бы схема простая, но выглядит сложновато. Redis как шина передачи данных через pubsub и как кеш для хранения состояния приборов; ЦУП, который слушает одновременно и Redis, и MQTT-брокер; ну и Ретранслятор, который слушает Redis. Уже страшно!

И да, Houston, we have a problem!

Какие появились проблемы


Первая проблема пришла из ЦУПа. Этот сервис написан с применением фреймворка RxKotlin для реактивного программирования. И он не мог справиться даже с четырьмя сотнями сообщений в секунду. У него утекала память, причины мы так и не установили. Отлаживать этот сервис было весьма муторно: любая попытка понять, что произошло с устройством в физическом мире и его представлением на стороне сервиса упиралась в то, что надо было как-то получить доступ к части состояния ЦУПа, а это не было изначально предусмотрено.



Из-за утечек памяти приходилось перезапускать систему по cron’у раз в несколько суток. И это особенно неприятно, потому что приводило к потере состояний приборов, постоянному восстановлению сессий и соединений с приборам и неуправляемой нагрузке на MQTT-брокер. А значит, пользователи люто недовольны. Это же у них Алиса чайник не включает, свет не переключает, да и пылесос голосом не отправить на кухню разбираться с внештатной ситуацией! Это не то будущее, которого мы хотим!

Последней каплей, честно признаёмся, стало то, что у нас не было хороших компетенций по Java и Kotlin для backend’а… Однако не время для саркастических ухмылок, господа! Всё впереди.

Да, мы прекрасно понимаем наших коллег, которые нежно любят Java и Kotlin и с ума сходят от реактивного программирования. Мы в курсе, что Kotlin и Java, как правильные грелки, могут порвать любого тузика. Согласны с тем, что Reactive-фреймворки хорошо подходят для программирования интерфейсов в web и мобильных приложениях, и что они просто прекрасны везде, где есть взаимодействие с человеком через UI. Но не на бэкенде! Там это прекращается в боль.

Вторая проблема — это редкие, но внезапные потери сообщений из Redis. Опыт эксплуатации показал, что Redis растеряйка тот ещё: его pub-sub механизм не самый надежный, так как клиент, подписываясь на каналы в Redis’е и получая из них сообщения, не отсылает назад никаких подтверждений, а Redis их не повторяет, что порой приводит к потерям.

Третья проблема: в Redis’е хранилось не только состояние приборов, команды и ответы на них. Там лежали и связи между пользователями и приборами (какие приборы за какими пользователями закреплены), связи между шлюзами и самими приборами.

Всё это надо было быстро вытаскивать. В Redis’е это выродилось в кучу дополнительных ключей, которые играли роль индексов. И тоже распухала память: Redis мог занять всю доступную память сервера. На одно устройство и один гейтвей приходилось что-то около 10 ключей, это в среднем. Итого на 1000 приборов в Redis’е уже было 10 000 ключей. Числа очень примерные, но дают понять масштаб.

А после перезагрузки ЦУПа по cron’у приходилось чистить память и в нём.

Всё бы ничего, жили бы себе дальше, недовольные пользователи со временем привыкают, инфраструктура, хм… тоже привыкает. Но тут на горизонте замаячила грозовая туча. Ведь мы стремимся продать как можно больше умной техники. Всячески захватить мир. А там уже превратить его в утопию в стиле Стругацких, либо в антиутопию в стиле Чёрного Зеркала. А это значит, что нужно больше приборов. Больше шлюзов для управления умными приборами через Интернет. Больше пользователей-любителей киберпанка, которые хотят командовать голосом. И как следствие — сильная нагрузка на инфраструктуру.

Полный коллапс?

Как мы узнали про Tarantool


Стали думать. Начинали писать решение на Python3 и Asyncio, и оно даже начинало работать. Но этого оказалось мало, пришлось искать дальше. В процессе поиска узнали, что Mail.ru использует Tarantool, который выдерживает просто космические нагрузки — местами до 500 000 запросов в секунду, а то и больше. Хм! Уже интересно.

Дальше больше — с помощью Tarantool компания Nokia построила целую систему умного дома [1, 2]. Ого! Это уже совсем близко! А кто еще использует Tarantool? Оказалось, что это прямо-таки горячая тема! Почитали документацию и решили попробовать.



Что нам понравилось, кроме наличия проверенных кейсов?


Мы увидели, что Tarantool можно использовать как сервер приложений, объединённый с базой данных. Можно писать код, который плотно работает с данными, ловит события, сохраняет их, что-то удаляет, выполняет процессы. Всё это с возможностью получать и сохранять данные тут же, прямо на месте — очень удобно!

Можно подключиться к экземпляру Tarantool, получить консоль с REPL. Это весьма удобно для отладки, администрирования и разбора полётов.

Встроенные инструменты для шардирования, репликации, и очень крутой, развитый инструмент Cartridge. В нем уже есть необходимые примитивы, в том числе и удобное разделение сервисов по ролям и с кластеризацией. На голом Python нам пришлось бы попотеть.

Всё это нас окончательно убедило, у нас загорелись глаза, и мы сели писать код.

Как мы писали код


Мы не решились разом взять Cartridge с шардированием и репликацией. Решили начать с самого малого и делать всё поэтапно. Да и сроки были небольшие: около 4-5 месяцев, с довольно сжатым бюджетом.

И мы написали новый транспортный сервис с нуля на голом Tarantool! На этом этапе практически сразу выкинули Redis. Ибо незачем, в Tarantool есть всё и даже больше!

Впрочем, пришлось сильно попотеть, потому что нам встретилось много мелких неприятностей. Например, в мире open-source довольно мало библиотек для Lua, а в особенности для работы с MQTT. Родной тарантуловский коннектор не подходил: был нужен протокол MQTT версии 5. Так что пришлось допилить небольшую библиотечку luamqtt. Огромное спасибо её создателю Александру Киранову!

Вот пример использования luamqtt под Tarantool:

local fiber = require 'fiber'
local log = require 'log'

local mqtt = require 'mqtt'
local ioloop  = require 'mqtt.ioloop'
local connector = require 'app.tnt_mqtt_connector'

local M = {}
local client
ioloop = ioloop.get(true, { sleep_function = fiber.sleep })

function M.subscribe(topic, qos)
    client:subscribe {
        topic = topic,
        qos = qos or 0,
        no_local = true
    }
end

function M.unsubscribe(topic)
    client:unsubscribe { topic = topic }
end

function M.publish(topic, payload, qos)
    client:publish({
        topic = topic,
        payload = payload,
        qos = qos or 0
    })
end

function M.start(opts)
    client = mqtt.client {
        id = opts.client_id,
        uri = opts.host .. ':' .. tostring(opts.port),
        clean = opts.clean or true,
        connector = connector,
        version = mqtt.v50,
        keep_alive = opts.keep_alive or 60,
        reconnect = opts.reconnect or true,
        username = opts.user,
        password = opts.pass
    }

    client:on {
        connect = function(connack)
            if connack.rc ~= 0 then
                return log.error(
                    'mqtt error: error = %s, ack = %s',
                    connack:reason_string(),
                    connack
                )
            end
            log.info('mqtt: connected')
        end,
        message = handle_msg,
        error = handle_error
    }

    ioloop:add(client)

    fiber.create(function()
        while true do
            ioloop:iteration()
            fiber.yield()
        end
    end)
end

return M 

Для компактности примера в коде не показаны функции handle_msg и handle_error, но их назначение в целом понятно. Отдельно стоит упомянуть про модуль tnt_mqtt_connector, он появился в результате доработки напильником коннектора, который был уже в библиотеке для работы через LuaSocket. То есть, библиотека довольно удобная и простая, можно использовать в разных Lua-средах.

Следующий момент, который пришлось обдумывать: как обрабатывать сообщения и реагировать на них? Писать код, где каждый пришедший запрос создает новый файбер, который что-то там делает, а для таймаута выполняет fiber.sleep(t), не совсем удобно. Это подходит больше для HTTP-сервисов, и для этого есть fiber pool. Но не для сетевых сервисов, которые занимаются преобразованием и маршрутизацией сообщений, и где работа не всегда укладывается в схему «запрос-ответ». Да и при неконтролируемом возрастании количества файберов экземпляр Tarantool мог терять в производительности. Чтобы проверить это утверждение, мы провели небольшой эксперимент: создали большое количество файберов, которые засыпают на блокировке, а затем замерили реальное время, за которое отработала fiber.sleep главного файбера. И внезапно обнаружили ошибку!

local fiber = require 'fiber'
local clock = require 'clock'
local log = require 'log'

local LIMIT = 32000

local lock = fiber.cond()
local counter = 0

local t1 = clock.realtime()
for i = 1, LIMIT do
   fiber.create(function()
       lock:wait()
       counter = counter + 1
   end)
end
local t2 = clock.realtime()

log.info('fibers created, took %s seconds', t2 - t1)

t1 = clock.realtime()
fiber.sleep(10)
t2 = clock.realtime()
log.info('real sleep time: %s seconds', t2 - t1)

t1 = clock.realtime()
lock:broadcast()
fiber.yield()
t2 = clock.realtime()

log.info('fibers unlocked, counter is %s; took %s seconds', counter, t2 - t1)

Сперва посмотрим, как отработает скрипт при значении переменной LIMIT = 32000.

Пока что всё работает, никакой ошибки, мы получаем примерную точность fiber.sleep():

fibers created, took 0.32585644721985 seconds
real sleep time: 9.6747360229492 seconds
fibers unlocked, counter is 32000; took 0.026816844940186 seconds

Но вот давайте попробуем создать больше файбров… Например, LIMIT = 50000. Результат немного обескураживает:

SystemError fiber mprotect failed: Cannot allocate memory
SystemError fiber mprotect failed: Cannot allocate memory
fatal error, exiting the event loop

Попытки изменить настройки памяти (memtx_memory) ни к чему не привели. Это наблюдалось в версии Tarantool 2.5, но также повторяется и в версии 2.6. На самом деле, вполне логичная ошибка: память не резиновая (хотя, конечно, маловато файберов создалось, они довольно легковесные). И поэтому эксперимент является предупреждением и подтверждением того, что бесконтрольно создавать кучу файберов, которые могут массово уходить в спячку и занимать память, не самый лучший вариант. И если всё же это приходится делать, то желательно оборачивать вызов fiber.create в pcall.

Пришлось срочно перестроить своё мышление с синхронного стиля на конечные автоматы. А как делать таймауты для определенных состояний конечного автомата, мы подсмотрели в библиотеках самого Tarantool: queue и expirationd. Так что файберы — крутая штука, но использовать их надо немного и с умом!

Таймауты организуются просто. Для каждого устройства мы храним в спейсе состояние и время, когда мы в последний раз послали сообщение. И если состояние предполагает, что от этого прибора ожидается ответ (например, на ping), то по индексу, который построен как раз по двум полям (state и ts), мы вытаскиваем все устройства, для которых надо запускать логику таймаута. Вот как это примерно выглядит (опустим некоторые детали, для ясности):

local states = {
   OFFLINE = 0,
   ONLINE = 1,
   OPENING = 2,
   READY = 3,
   WAITING = 4
}

-- create space for user devices
local devices = box.schema.space.create('devices', {
    engine = 'memtx',
    if_not_exists = true
})
devices:format {
   {name = 'id', type = 'string'},
   {name = 'state', type = 'number'},
   {name = 'ts', type = 'number', is_nullable = true}
}
-- create index for timeouts and states
devices:create_index('pulse', {
    parts = {'state', 'ts'},
    type = 'tree',
    unique = false,
    if_not_exists = true
})


local function auto_yield()
   if not ticks then
       ticks = 512
   end
   local count = 0
   return function(...)
       count = count + 1
       if count % ticks == 0 then
           fiber.yield()
       end
       return ...
   end
end

local function get_expired(state, timeout)
   local now = math.floor(clock.realtime())
   return box.space.devices.index.pulse
       :pairs({states[state], now - timeout},
              {iterator = box.index.LT})
       :map(auto_yield())
       :take_while(function(device)
           return device.state == states[state]
        end)
       :filter(function(device)
           return now - device.ts > timeout
        end)
end

Функция get_expired вызывается с периодичностью в полсекунды внутри файбера. Всё достаточно просто! Может возникнуть вопрос: а что за функция auto_yield()? Она нужна, чтобы прерывать итераторы и передавать управление другим потокам исполнения. Итераторы в Tarantool хоть и мощная вещь, но у них есть одна особенность: когда мы итерируем по большому куску данных, мы рискуем заблокировать на какое-то время выполнение всех остальных файберов и процессов Tarantool, поскольку итераторы не отдают управление сами, это надо делать вручную. Поэтому придумали такой хитрый трюк с auto_yield, который вызывает fiber.yield каждые 512 записей.

Мы выкинули Redis, поэтому должны были переделать сервис, который обрабатывал запросы от голосовых ассистентов, чтобы он работал напрямую с Tarantool. Для работы из Python/Asyncio мы воспользовались библиотекой asynctnt, которую нам отрекомендовали в чате сообщества Tarantool и которая понравилась по возможностям. Еще взяли обертку для этой же библиотеки, но для работы с queue. Всё решаемо. Мы получили компактный, простой сервис для управления умными приборами. И он уже в production!

Итоги




Нам не нужно каждую ночь перезагружать сервис по cron’у и беспокоиться из-за утечек памяти. Не нужно переживать, что какой-то пользователь в этот момент говорит Алисе: «Поставь чайник!».

Проблем у пользователей стало меньше. Мы уже гораздо спокойнее разбираемся с ними, так как ничто не мешает подключиться к экземпляру Tarantool и тут же локализовать некорректное состояние, прибить зависший файбер, понять, что произошло и как это исправить.

Система стала более отзывчивой и быстрой, так как нет промежуточной прослойки в виде Redis: всё теперь работает напрямую, нет лишних пересылок сообщений между сервисами, повысилась скорость выполнения запросов. Управление техникой через голосовой помощник стало стабильнее, а тестировщики и ребята из технической поддержки спят гораздо спокойнее. Разработчики тоже :)

Старый ЦУП мог запросто съесть 9 гигабайт памяти сервера за несколько суток, а среднюю нагрузку на процессор он давал в районе 40 % (на все четыре ядра). И это при количестве одновременно подключенных устройств в районе трёх тысяч. Сейчас мы немного выросли, и максимальное количество устройств в онлайне уже приближается к четырём тысячам, но важно другое: новый ЦУП на Tarantool загружает процессор на 7 % и по памяти не вылезает за пределы одного гигабайта даже в пиковых нагрузках, когда много пользователей одновременно управляют своими чайниками и частота обрабатываемых сообщений от приборов достигает 1200 пакетов в секунду. Также стоит учесть запросы со стороны голосовых помощников, что добавляет нам еще до 100 пакетов в секунду, итого 1300 RPS. Пока немного, но задел есть.

А самое, самое-то главное: прекратились жалобы от пользователей по поводу неуправляемых устройств в голосовых помощниках. Их практически нет, а если и есть, то они уже связаны с другими проблемами, вроде зашумленного эфира, когда Bluetooth не желает хорошо работать :) А вот Алиса теперь выполняет команды чётко!

Своей первоначальной цели мы достигли. Опробовали Tarantool в самой простой его конфигурации, вывели это дело в production, и оно просто работает! А вот что у нас впереди?

А впереди самое интересное. Мы ещё не делали кластеризацию и шардирование транспортного сервиса. Мы пока не используем Cartridge. Впереди нас ждет миграция в Tarantool части микросервисов, написанных на Python/Tornado, и там уже Cartridge и кластеризация нам понадобятся.

Вот таким образом мы и наш мир стали еще на шаг ближе к одному из возможных состояний: Мир Полудня, где все умные роботы добрые и помогают человеку, или мир Чёрного Зеркала, где техника, нейтральная по сути своей, используется не самым добрым образом. Что будет — зависит от нас самих :)

Ссылки


  1. Glial — система управления IoT-устройствами.
  2. Nokia создала ПО для программирования интернета вещей на базе российской СУБД.
  3. Скачать Tarantool можно на официальном сайте, а получить помощь — в Telegram-чате.