Меня зовут Павел Плетнев, я разработчик в команде кредитных карт в Тинькофф. Хочу поделиться, как можно заранее оптимизировать работу с историей в Camunda или решить проблемы, если вдруг они появились.
Зачем сохранять историю
Одно из преимуществ Camunda — возможность наблюдать за ходом выполнения процесса. Мы используем админку Excamad:
Механизм реализован из коробки: события истории — старт и завершение процесса, история выполнения активити и так далее — сохраняются в базу данных.
Спустя время в таком процессе начинаются проблемы: объем исторических таблиц увеличивается, нагрузка на процессор возрастает и время обработки всех запросов растет.
Нужно искать баланс между временем жизни событий истории и уровнем истории в зависимости от количества запускаемых процессов. Чем больше процессов запускается, тем меньше должно быть событий истории и меньше глубина хранения истории.
История в Camunda — ценный источник информации, но со временем деградация производительности из-за истории будет только расти.И тогда нужно уменьшать глубину хранения или отключать ее полностью.
Попробуем сохранить истории и не потерять в производительности. У нас три цели:
Ничего не менять с точки зрения API для пользователей
Уменьшить нагрузку на БД, очистить диск от исторических данных и освободить процессорное время
-
Потратить минимум ресурсов на разработку нового решения
Экспорт исторических данных в другую систему
В Camunda есть стандартный механизм для настройки обработки событий истории через интерфейс — HistoryEventHandler. Поэтому просто напишем нужную реализацию.
Механизм экспорта может быть любым — http-запросы, брокеры сообщений и так далее. Почему мы используем Kafka:
Kafka поддерживает отказоустойчивость, когда недоступен сервис camunda_history_collector.
Kafka поддерживает асинхронную отправку событий и благодаря этому нет влияния на процесс.
Kafka позволяет контролировать нагрузку на сервис и БД camunda_history_collector за счет ограничения количества партиций.
Kafka упорядочивает сообщения, поэтому сообщения можно обрабатывать в порядке их создания.
Совет: если в событии указать полное имя класса, это поможет при десериализации. Формат данных JSON, в котором будет всего два поля/объекта — название класса и сама сущность.
Пример обработчика истории:
class CamundaHistoryEventHandler(
private val kafkaTemplate: KafkaTemplate<String, HistoryEventDto>
) : HistoryEventHandler {
override fun handleEvent(historyEvent: HistoryEvent) {
kafkaTemplate.sendDefault(
historyEvent.processInstanceId,
HistoryEventDto(
historyEvent.javaClass.canonicalName,
historyEvent
)
)
}
override fun handleEvents(historyEvents: MutableList<HistoryEvent>) {
historyEvents.forEach {
handleEvent(it)
}
}
}
Возможны два варианта реализации: с поддержкой транзакций и без. Работа с транзакциями реализуется через механизм сессий Camunda — Session. Выбор решения зависит от потребностей. Лучше работать с поддержкой транзакций. Но если есть длинные транзакции в самом процессе, подойдет второй вариант.
Можно сразу добавить возможность отключить сохранение истории в БД camunda_runtime. Для этого используйте плагин Camunda:
class KafkaHistoryEventEnginePlugin : ProcessEnginePlugin {
override fun preInit(processEngineConfiguration: ProcessEngineConfigurationImpl) {
processEngineConfiguration.isEnableDefaultDbHistoryEventHandler = false
}
override fun postInit(processEngineConfiguration: ProcessEngineConfigurationImpl?) = Unit
override fun postProcessEngineBuild(processEngine: ProcessEngine?) = Unit
}
Совет: отключить сохранение истории на стороне camunda_runtime можно позже, если нужен плавный переход.
Обработка исторических данных
Данные после экспорта можно записать в логи, сделать отдельное хранилище или отправлять их на почту. Но эти варианты противоречат первой и третьей целям, которые мы закрепили выше.
В Camunda есть готовый механизм для сохранения данных в БД, и его можно переиспользовать — нужно поднять еще одну Camunda и реализовать в ней нужную функциональность. За обработку событий истории отвечает класс DbHistoryEventHandler.
Через Jackson можно десериализовать событие и передать его в обработчик.
Пример десериализации:
fun <T : HistoryEvent> readHistoryEvent(
historyEvent: HistoryEventDto
): T {
val clazz = Class.forName(historyEvent.eventClass) as Class<T>
return objectMapper.treeToValue(historyEvent.eventDto, clazz)
}
Camunda работает через паттерн «Команда». Чтобы вызвать обработчик, нужно обернуть его в отдельную команду и вызывать ее выполнение через CommandExecutor.
class ProcessHistoryEventCommand(
private val historyEvent: HistoryEvent,
private val eventHandler: DbHistoryEventHandler
) : Command<Unit>, Serializable {
override fun execute(
commandContext: CommandContext
) {
eventHandler.handleEvent(historyEvent)
}
}
commandExecutor.execute(
ProcessHistoryEventCommand(
event,
eventHandler
)
)
Настройка редиректа запросов для REST API Camunda
На предыдущих этапах мы настроили экспорт данных и сохранение их в базу. Остается реализовать последнюю цель — ничего не менять для пользователей со стороны API.
Создаем еще один источник данных — отдельный сервис с историей. При этом нужно агрегировать данные из разных систем автоматически. С этой задачей справится почти каждый сервис API gateway. Нужно просто перенаправить исторические запросы из camunda_runtime в новый сервис camunda_history_collector — и тогда для пользователей ничего не изменится.
Вместо заключения
Вместо подведения итогов расскажу о преимуществах выбранного решения:
В основной БД camunda_runtime хранятся только runtime-данные. Благодаря этому снижается нагрузка на железо — процессор не обрабатывает исторические запросы, и объем занятого пространства на диске не возрастает.
Поддерживается агрегация истории в одном сервисе — camunda_history_collector для разных camunda_runtime.
В camunda_runtime можно управлять временем жизни для событий истории, но ttl задается глобально или на уровне определенного процесса. В camunda_history_collector можно указывать ttl для разных типов событий, когда не вся история нужна в дальнейшем. Например, историю выполнения job`ов хранить пять дней, а историю выполнения процесса — 30 дней.
Работоспобность camunda_history_collector не влияет на работу camunda_runtime.
Мы опубликовали нашу библиотеку на Github для написания делегатов в декларативном стиле camunda-delegator-lib. Она помогает писать меньше бойлерплейта и делает код более читаемым.
Если вы хотите задать вопрос или предложить тему для обсуждения — добро пожаловать в комментарии или в наш GitHub.