Всем привет! Я Илья, бэкенд-разработчик в Яндекс Телемосте — сервисе для организации видеовстреч и онлайн-трансляций.  

При разработке одной из функций Телемоста нам нужно было решить нетривиальную задачу по запуску частых тасок по расписанию. При этом таски пришлось запускать с посекундной точностью и максимальной отказоустойчивостью — даже небольшой перерыв между запусками вёл к неработоспособности сервиса. О том, как решили эту задачу, расскажу в статье.

Зачем нам понадобился частый запуск тасок

В разработке бэкенда часто приходится запускать какие-то таски по расписанию. Одни выполняются раз в сутки или раз в несколько часов — для таких, как правило, не критична точность интервала между запусками — плюс-минус пара минут ничего не решат. А есть таски, которые надо запускать часто, например, каждую секунду. Примером может быть real time система, у которой нужно регулярно проверять работоспособность; или нужен регулярный пересчет какой-либо трудновыполнимой задачи. Например, у вас есть сервис, и на каждый запрос он вычисляет какую-то сложную задачу. Если вы сделаете решение в лоб, то есть будете просто вычислять эту задачу на каждый запрос, то не сможете держать большой RPS и получите избыточную нагрузку на систему. Однако можно раз в N секунд  вычислять значение, сохранять результат, а затем отдавать уже предпосчитанные данные. Таким образом, получится снизить нагрузку на систему и держать большой RPS.

Так вот, при разработке сервиса в Телемосте перед нами встала задача: как обеспечить стабильность решения при высокой частоте запуска тасок — об этом и пойдет речь в статье.

Как работают переговорки в Телемосте

Расскажу про наш крупный проект, о котором дальше пойдет речь в статье: внедрение переговорок в Телемосте. В проекте перед нами стояла цель — сделать так, чтобы в Телемосте можно было принимать звонки из переговорных комнат. Цель такой функциональности — дать пользователям возможность совещаться в конференции, когда вас несколько людей в одной комнате. Причем необходимо поддерживать разное оборудование переговорок. Проект технически сложный, поэтому опишу верхнеуровнево, как все работает под капотом, прежде чем мы перейдем к самой теме статьи.

Как работают переговорки в Телемосте
Как работают переговорки в Телемосте
  1. У нас есть две сущности — переговорка и конференция в Телемосте.
    В переговорке есть просто телевизор — через него нельзя подключаться в конференцию как с ноутбука. А также есть оборудование для IP телефонии: специальный терминал, который выглядит как планшет.

  2. Пользователь заходит в переговорку, набирает номер встречи и звонит в Телемост.

  3. Создается SIP-звонок. SIP — это протокол, который используется в IP-телефонии для установления и завершения звонка.

  4. Далее звонок обрабатывается SIPGW — бэкенд-компонентом в Телемосте, который инициирует SIP-звонок и посылает Create call запрос в TRAM — Telemost Room Agent Manager (далее — TRAM) . TRAM — основной бэкенд-компонент переговорок в Телемосте, который управляет всеми звонками: создает их, сохраняет состояния в базе данных, завершает, а также осуществляет health checks — проверки, что звонок работает правильно.

  5. Появляется еще одна сущность — Room Agent. Это stateful-компонент, обслуживающий звонок: он пересылает media-данные из переговорки в конференцию и в обратную сторону.

  6. TRAM поднимает Room Agent, отправляет ему invite-запрос, а последний отвечает. И ответ пересылается обратно в переговорку. Таким образом, между переговоркой и Room Agent-ом устанавливается media-канал с аудио и видео.

 А Room Agent как обычный клиент Телемоста подключается в конференцию, и между ним и media-сервером также устанавливается media-канал.   

Постановка задачи

Для каждого активного звонка надо регулярно выполнять health checks: что должна делать эта таска?

  1. Находить все активные звонки, которые давно не проверялись

  2. Ставить асинхронные задачи на проверку каждого найденного звонка. Асинхронная задача в свою очередь должна:

    1. Проверять Room Agent

    2. Спрашивать о звонке у Telemost Backend

    3. Проверять оборудование переговорки

  3. Зафетчить текущему воркеру асинхронные задачи

Однако из-за специфики задачи у нас накладываются жесткие требования: 

Запуск каждую секунду. Мы хотим, чтобы реакция на любые изменения была быстрой — порядка десяти секунд. Например, если у Room Agent отвалилась сеть, и пользователи переговорки потеряли связь с Телемостом, ее необходимо как можно скорее заменить, чтобы пользователи смогли продолжить встречу. Этим обусловлено требование частого запуска health check тасков.

Отказоустойчивость и бесперебойный запуск тасок. Также если health check таски не будут посылаться хотя бы минуту, то звонок просто дропнется.

Легковесность. Так как таски запускаются часто, а звонков может быть много, механизм должен быть эффективным. 

Первый подход к задаче: почему текущий фреймворк не справился

Мы реализовали запуск таких тасок каждую секунду на весь TRAM, который состоит из многих инстансов. Сделали это на уже существующем у нас в Яндекс 360 фреймворке Bazinga — планировщике тасок в распределенном кластере. Рассмотрим, как он работает:

  • Контроллер выбирает мастера, который будет выполнять таску, отдает на выполнение и ждет, когда мастер сообщит о выполнении таски.

  • Через секунду снова выбирается мастер, который запускает таску и так далее.

Во время одного из тестовых релизов TRAM произошла интересная история: у нас было еженедельное демо, где мы синкались по проекту переговорок: что нового сделали и так далее. И тут, внезапно у всех отвалились переговорки, причем одновременно. Мы судорожно начали откатывать релиз — проблема была неочевидна, TRAM — stateless-компонент и должен катиться без влияния на текущие звонки. Проблема оказалась в том, что инстанс TRAM начал рестартиться пока на нем выполнялась health check таска. Рестарт занял около шести минут, а контроллер Bazinga продолжал ждать новостей о выполнении этой таски и не запускал новую, думая, что старая все еще выполняется. Так совпало, что какая-то пода взяла выполнять health check таску (которая выполняется меньше секунды), и ее начало рестартить именно в этот момент. Круто, что мы отловили это в тестовом релизе и поняли, что нужно искать другое решение. В общем, мы поняли всю специфичность задачи, что таскам нельзя залипать.

Более того, мы пришли к выводу, что Bazinga не подходит для решения нашей задачи. Почему? Дело в том, что насколько маленький timeout у таски не указывай, Bazinga сможет поставить на выполнение новую не раньше, чем через минуту. А для нас это неприемлемо много. Это необходимо для гарантии at most once, которую дает Bazinga. Менять константу опасно, может разнести все сервисы, которые используют Bazinga — например, Диск. А чинить по-умному долго и дорого, у нас не было на это времени. 

Обзор фреймворков Quartz, Spring ShedLock: как полечить залипание тасок без лишней нагрузки на сервис 

Мы пошли искать альтернативные решения и исследовать готовые фреймворки — например, Quartz или Spring ShedLock.

Quartz — мощный фреймворк планировщика задач с большим, но не нужным для нашей конкретной задачи функционалом. Он подойдет для приложений, где требуется планирование ресурсоемких задач. Quartz занимается распределением нагрузки по нескольким узлам, также он подойдет, когда необходимо учитывать гибкие настройки типа исключения праздничных дней. 

Почему не взяли:

  • Он как и Bazinga подвержен все тем же проблемам с залипанием тасок

  • Требует завести более десяти таблиц в базе данных

  • Не подходит для простых и частых тасок — низкая эффективность из-за многочисленного обращения к базе данных

Spring ShedLock намного проще, чем Quartz. Действительно, он реализует механизм запуска задач, основанный на временных блокировках. Основная цель Spring Shedlock – реализовать распределенный lock на базе данных для синхронизированного запуска тасок на кластере из нескольких инстансов приложения с общей базой данных. Он решает проблему залипания тасок, но…

Почему не взяли:

  • Избыточен для нашей задачи: он отчитывается о выполнении каждой таски, сохраняя состояние в базе данных. Для частых тасок это дает лишнюю нагрузку на сервис, не привнося никаких преимуществ. 

Как мы решили проблему: почему вся соль в thread local timeout

Так мы решили реализовать свой механизм запуска частых, быстрых и очень важных health check тасок для SIP-звонков. Дополнительными требованиями были максимальная эффективность и легковесность механизма, а также то, что кейс одновременного выполнения нескольких тасок будет исключен.

Этот механизм мы назвали Pecker. «‎Почему?», —  спросите вы. Потому что он «долбит» как дятел: часто, точно и бесперебойно.

Теперь разберемся, как именно устроено решение. В базе данных создается очень простая таблица, через которую будет происходить синхронизация запусков тасок:

CREATE TABLE pecker_tasks (
  name TEXT NOT NULL,
  executed_at TIMESTAMPTZ(3) NOT NULL,

  CONSTRAINT pk_pecker_tasks_name PRIMARY KEY (name)
);

Создается абстрактный класс AbstractPeckerTask, наследники которого будут теми самыми pecker-задачами. Каждая такая задача регистрируется в контексте как бин, после чего в таблицу вставляется запись для отслеживания этой задачи, и запускается специальный alarm thread.

@Override
protected void alarmTraced() {
    // Проверяем, пора ли выполнять таску
    if (needExecuteTask()) {
        // Да, пора. Обновили executed_at в базе и сохраняем ThreadLocalTimeout
        ThreadLocalTimeout.Handle tlt = ThreadLocalTimeout.push(timeout);
        try {
            // Выполняем саму бизнес логику таски
            executeTask();
        } catch (ThreadLocalTimeoutException e) {
            log.warn("Thread local timeout occurred during {}", taskName, e);
        } finally {
            // Вытаскиваем ThreadLocalTimeout
            tlt.popSafely();
        }
    }
}

private boolean needExecuteTask() {
    // Возвращаем true, если запись была обновлена. Иначе возвращаем false
    return peckerDao.updateExecutedAtIfTimeToStart(taskName, delay.getMillis()) > 0;
}


// Поток просыпается раз в заданное время и пытается взять задачу на выполнение. 
// Если update вернул единицу, тогда запускается задача, иначе alarm thread засыпает до следующего пробуждения. 
// Вот такой запрос используется для атомарной проверки и обновления записи в базе данных.

@Override
public int updateExecutedAtIfTimeToStart(String name, long delayMillis) {

    // В этом запросе мы находим таску по имени и проверяем, что она не запускалась дольше чем delayMillis. 
    // В таком случае мы выставляем executed_at в текущее время, тем самым блокируя выполнение таски на следующие delayMillis миллисекунд
    @Language("SQL") final String query = """
            update pecker_tasks set executed_at = now()
            where name = :name and now() - executed_at > :delayMillis * interval '1 millisecond'
            """;

    Map<String, Object> params = new HashMap<>();
    params.put("name", name);
    params.put("delayMillis", delayMillis);

    return jdbcTemplate.update(query, params);
}

Гарантия, что в один момент времени будет выполняться не больше одной pecker-задачи, обеспечивается механизмом thread local timeout. Прежде чем запустить саму бизнес-логику таски, в thread local сохраняется максимальное время работы и текущее время. Если задача будет работать указанное в thread local timeout время, то вылетит исключение, которое завершит выполнение pecker-таски. Также мы добавили в Pecker трейсинг на основе Open Telemetry, что нам дало возможность связывать логи из разных компонентов Телемоста, которые относятся к одному запуску Pecker-таски.

Ограничения нашего решения: ретраи, выбор воркера, timeouts

Теперь давайте поговорим про границы применимости Pecker.

В Pecker невозможно выбирать или как-то влиять на выбор воркера, который будет выполнять таску, например, может получиться, что таску будет постоянно брать один и тот же воркер. Это заложено в самой концепции механизма запуска тасок: все воркеры в равной степени пытаются выполнить таск, но достается он только какому-то одному, случайному. Благодаря этому достигается отказоустойчивость, даже если 9 из 10 воркеров выйдут из строя, то таска будет выполняться с заданной частотой на одном оставшемся. И обоснование с другой стороны: из двух равносильных решений надо выбирать более простое. Нам не нужно влиять на возможность выбора воркера в таких задачах, следовательно зачем это делать? Лишнее усложнение.

А еще в Pecker невозможно дать воркеру чуть больше времени на выполнение, если таск оказался сложным — выполнение жестко обрубится по таймауту.

Глобально, у Pecker есть свой круг применения — задачи которые необходимо часто и отказоустойчиво запускать. И в этих задачах воркеры не приоритезированы.

Заключение

Таким образом, мы получили простое, оптимальное и удобное решение для запуска частых и быстрых тасок с гарантией последовательной работы и единственным модифицирующим запросом в базу данных на запуск таски. Наши health check таски заработали безотказно, мы смогли катать релизы бэкенда Телемоста в любое время, и больше не было ни одного сломанного звонка. Результат можно увидеть на гистограмме числа ошибочных завершений звонков из-за отсутствия health check.

*Ошибки возникали не на проде, а в тестовом окружении
*Ошибки возникали не на проде, а в тестовом окружении

Комментарии (4)


  1. Zantiago
    19.08.2024 17:27
    +3

    Этот механизм мы назвали Pecker. «‎Почему?», —  спросите вы. Потому что он «долбит» как дятел: часто, точно и бесперебойно.

    pecker это не совсем дятел ;)

    извините!


    1. ilyagrig2000 Автор
      19.08.2024 17:27

      Это пасхалочка)


  1. Mikaelo
    19.08.2024 17:27
    +2

    А что за механизм "thread local timeout" ? Как это реализовано внутри ?


    1. ilyagrig2000 Автор
      19.08.2024 17:27

      В thread local переменной сохраняется дедлайн работы потока (текущее время + заданный таймаут). Далее для всех сетевых запросов выставляется таймаут равный оставшемуся времени работы потока. Можно сделать удобную реализацию, чтобы thread local timeout автоматически учитывался во всех клиентах и jdbcTemplate. Так же проверки на thread local timeout вручную добавляются в time consuming операциях, например на каждой итерации цикла. Если время работы потока вышло, то выбрасывается ThreadLocalTimeoutException, который завершает работу этого потока. Если запускаются параллельные потоки, то в них прокидывается таймаут равный исходному таймауту минус уже прошедшее время.