Меня зовут Павел Плетнев, я разработчик в команде кредитных карт в Тинькофф. Хочу поделиться, как можно заранее оптимизировать работу с историей в Camunda или решить проблемы, если вдруг они появились.

Зачем сохранять историю

Одно из преимуществ Camunda — возможность наблюдать за ходом выполнения процесса. Мы используем админку Excamad:

История вымышленного процесса в Excamad
История вымышленного процесса в Excamad

Механизм реализован из коробки: события истории — старт и завершение процесса, история выполнения активити и так далее — сохраняются в базу данных.

Примерная схема хранения данных в Camunda
Примерная схема хранения данных в Camunda

Спустя время в таком процессе начинаются проблемы: объем исторических таблиц увеличивается, нагрузка на процессор возрастает и время обработки всех запросов растет.

Нужно искать баланс между временем жизни событий истории и уровнем истории в зависимости от количества запускаемых процессов. Чем больше процессов запускается, тем меньше должно быть событий истории и меньше глубина хранения истории.

История в Camunda — ценный источник информации, но со временем деградация производительности из-за истории будет только расти.И тогда нужно уменьшать глубину хранения или отключать ее полностью.

Попробуем сохранить истории и не потерять в производительности. У нас три цели:

  1. Ничего не менять с точки зрения API для пользователей

  2. Уменьшить нагрузку на БД, очистить диск от исторических данных и освободить процессорное время

  3. Потратить минимум ресурсов на разработку нового решения

    Три этапа, которые помогут сохранить истории и не потерять производительность
    Три этапа, которые помогут сохранить истории и не потерять производительность

Экспорт исторических данных в другую систему

В Camunda есть стандартный механизм для настройки обработки событий истории через интерфейс — HistoryEventHandler. Поэтому просто напишем нужную реализацию.

Механизм экспорта может быть любым — http-запросы, брокеры сообщений и так далее. Почему мы используем Kafka:

  1. Kafka поддерживает отказоустойчивость, когда недоступен сервис camunda_history_collector.

  2. Kafka поддерживает асинхронную отправку событий и благодаря этому нет влияния на процесс.

  3. Kafka позволяет контролировать нагрузку на сервис и БД camunda_history_collector за счет ограничения количества партиций.

  4. Kafka упорядочивает сообщения, поэтому сообщения можно обрабатывать в порядке их создания.

Совет: если в событии указать полное имя класса, это поможет при десериализации. Формат данных JSON, в котором будет всего два поля/объекта — название класса и сама сущность.

Пример обработчика истории:

class CamundaHistoryEventHandler(
    private val kafkaTemplate: KafkaTemplate<String, HistoryEventDto>
) : HistoryEventHandler {
     override fun handleEvent(historyEvent: HistoryEvent) {
        kafkaTemplate.sendDefault(
            historyEvent.processInstanceId,
            HistoryEventDto(
                historyEvent.javaClass.canonicalName,
                historyEvent
            )
        )
    }
 
    override fun handleEvents(historyEvents: MutableList<HistoryEvent>) {
        historyEvents.forEach {
            handleEvent(it)
        }
    }
}

Возможны два варианта реализации: с поддержкой транзакций и без. Работа с транзакциями реализуется через механизм сессий Camunda — Session. Выбор решения зависит от потребностей. Лучше работать с поддержкой транзакций. Но если есть длинные транзакции в самом процессе, подойдет второй вариант.

Можно сразу добавить возможность отключить сохранение истории в БД camunda_runtime. Для этого используйте плагин Camunda:

class KafkaHistoryEventEnginePlugin : ProcessEnginePlugin {
    override fun preInit(processEngineConfiguration: ProcessEngineConfigurationImpl) {
        processEngineConfiguration.isEnableDefaultDbHistoryEventHandler = false
    }
 
    override fun postInit(processEngineConfiguration: ProcessEngineConfigurationImpl?) = Unit
 
    override fun postProcessEngineBuild(processEngine: ProcessEngine?) = Unit
}

Совет: отключить сохранение истории на стороне camunda_runtime можно позже, если нужен плавный переход.

Схема экспорта исторических данных, результат первого этапа
Схема экспорта исторических данных, результат первого этапа

Обработка исторических данных

Данные после экспорта можно записать в логи, сделать отдельное хранилище или отправлять их на почту. Но эти варианты противоречат первой и третьей целям, которые мы закрепили выше.

В Camunda есть готовый механизм для сохранения данных в БД, и его можно переиспользовать — нужно поднять еще одну Camunda и реализовать в ней нужную функциональность. За обработку событий истории отвечает класс DbHistoryEventHandler.

Через Jackson можно десериализовать событие и передать его в обработчик. 

Пример десериализации:

fun <T : HistoryEvent> readHistoryEvent(
        historyEvent: HistoryEventDto
): T {
     val clazz = Class.forName(historyEvent.eventClass) as Class<T>
     return objectMapper.treeToValue(historyEvent.eventDto, clazz)
}

Camunda работает через паттерн «Команда». Чтобы вызвать обработчик, нужно обернуть его в отдельную команду и вызывать ее выполнение через CommandExecutor.

class ProcessHistoryEventCommand(
    private val historyEvent: HistoryEvent,
    private val eventHandler: DbHistoryEventHandler
) : Command<Unit>, Serializable {
 
    override fun execute(
        commandContext: CommandContext
    ) {
        eventHandler.handleEvent(historyEvent)
    }
}
 
commandExecutor.execute(
    ProcessHistoryEventCommand(
       event,
       eventHandler
    )
)
Схема процесса обработки исторических данных, результат второго этапа
Схема процесса обработки исторических данных, результат второго этапа

Настройка редиректа запросов для REST API Camunda

На предыдущих этапах мы настроили экспорт данных и сохранение их в базу. Остается реализовать последнюю цель — ничего не менять для пользователей со стороны API.

Создаем еще один источник данных — отдельный сервис с историей. При этом нужно агрегировать данные из разных систем автоматически. С этой задачей справится почти каждый сервис API gateway. Нужно просто перенаправить исторические запросы из camunda_runtime в новый сервис camunda_history_collector — и тогда для пользователей ничего не изменится.

Схема настройки редиректа исторических запросов для REST API Camunda, результат третьего этапа
Схема настройки редиректа исторических запросов для REST API Camunda, результат третьего этапа

Вместо заключения

Вместо подведения итогов расскажу о преимуществах выбранного решения:

  1. В основной БД camunda_runtime хранятся только runtime-данные. Благодаря этому снижается нагрузка на железо — процессор не обрабатывает исторические запросы, и объем занятого пространства на диске не возрастает.

  2. Поддерживается агрегация истории в одном сервисе — camunda_history_collector для разных camunda_runtime.

  3. В camunda_runtime можно управлять временем жизни для событий истории, но ttl задается глобально или на уровне определенного процесса. В camunda_history_collector можно указывать ttl для разных типов событий, когда не вся история нужна в дальнейшем. Например, историю выполнения job`ов хранить пять дней, а историю выполнения процесса — 30 дней.

  4. Работоспобность camunda_history_collector не влияет на работу camunda_runtime.


Мы опубликовали нашу библиотеку на Github для написания делегатов в декларативном стиле camunda-delegator-lib. Она помогает писать меньше бойлерплейта и делает код более читаемым.

Если вы хотите задать вопрос или предложить тему для обсуждения — добро пожаловать в комментарии или в наш GitHub.

Комментарии (0)