
Меня зовут Павел Плетнев, я разработчик в команде кредитных карт в Тинькофф. Хочу поделиться, как можно заранее оптимизировать работу с историей в Camunda или решить проблемы, если вдруг они появились.
Зачем сохранять историю
Одно из преимуществ Camunda — возможность наблюдать за ходом выполнения процесса. Мы используем админку Excamad:

Механизм реализован из коробки: события истории — старт и завершение процесса, история выполнения активити и так далее — сохраняются в базу данных. 

Спустя время в таком процессе начинаются проблемы: объем исторических таблиц увеличивается, нагрузка на процессор возрастает и время обработки всех запросов растет.
Нужно искать баланс между временем жизни событий истории и уровнем истории в зависимости от количества запускаемых процессов. Чем больше процессов запускается, тем меньше должно быть событий истории и меньше глубина хранения истории.
История в Camunda — ценный источник информации, но со временем деградация производительности из-за истории будет только расти.И тогда нужно уменьшать глубину хранения или отключать ее полностью.
Попробуем сохранить истории и не потерять в производительности. У нас три цели:
Ничего не менять с точки зрения API для пользователей
Уменьшить нагрузку на БД, очистить диск от исторических данных и освободить процессорное время
- 
Потратить минимум ресурсов на разработку нового решения

Три этапа, которые помогут сохранить истории и не потерять производительность  
Экспорт исторических данных в другую систему
В Camunda есть стандартный механизм для настройки обработки событий истории через интерфейс — HistoryEventHandler. Поэтому просто напишем нужную реализацию.
Механизм экспорта может быть любым — http-запросы, брокеры сообщений и так далее. Почему мы используем Kafka:
Kafka поддерживает отказоустойчивость, когда недоступен сервис camunda_history_collector.
Kafka поддерживает асинхронную отправку событий и благодаря этому нет влияния на процесс.
Kafka позволяет контролировать нагрузку на сервис и БД camunda_history_collector за счет ограничения количества партиций.
Kafka упорядочивает сообщения, поэтому сообщения можно обрабатывать в порядке их создания.
Совет: если в событии указать полное имя класса, это поможет при десериализации. Формат данных JSON, в котором будет всего два поля/объекта — название класса и сама сущность.
Пример обработчика истории:
class CamundaHistoryEventHandler(
    private val kafkaTemplate: KafkaTemplate<String, HistoryEventDto>
) : HistoryEventHandler {
     override fun handleEvent(historyEvent: HistoryEvent) {
        kafkaTemplate.sendDefault(
            historyEvent.processInstanceId,
            HistoryEventDto(
                historyEvent.javaClass.canonicalName,
                historyEvent
            )
        )
    }
 
    override fun handleEvents(historyEvents: MutableList<HistoryEvent>) {
        historyEvents.forEach {
            handleEvent(it)
        }
    }
}
Возможны два варианта реализации: с поддержкой транзакций и без. Работа с транзакциями реализуется через механизм сессий Camunda — Session. Выбор решения зависит от потребностей. Лучше работать с поддержкой транзакций. Но если есть длинные транзакции в самом процессе, подойдет второй вариант.
Можно сразу добавить возможность отключить сохранение истории в БД camunda_runtime. Для этого используйте плагин Camunda:
class KafkaHistoryEventEnginePlugin : ProcessEnginePlugin {
    override fun preInit(processEngineConfiguration: ProcessEngineConfigurationImpl) {
        processEngineConfiguration.isEnableDefaultDbHistoryEventHandler = false
    }
 
    override fun postInit(processEngineConfiguration: ProcessEngineConfigurationImpl?) = Unit
 
    override fun postProcessEngineBuild(processEngine: ProcessEngine?) = Unit
}
Совет: отключить сохранение истории на стороне camunda_runtime можно позже, если нужен плавный переход.

Обработка исторических данных
Данные после экспорта можно записать в логи, сделать отдельное хранилище или отправлять их на почту. Но эти варианты противоречат первой и третьей целям, которые мы закрепили выше.
В Camunda есть готовый механизм для сохранения данных в БД, и его можно переиспользовать — нужно поднять еще одну Camunda и реализовать в ней нужную функциональность. За обработку событий истории отвечает класс DbHistoryEventHandler.
Через Jackson можно десериализовать событие и передать его в обработчик.
Пример десериализации:
fun <T : HistoryEvent> readHistoryEvent(
        historyEvent: HistoryEventDto
): T {
     val clazz = Class.forName(historyEvent.eventClass) as Class<T>
     return objectMapper.treeToValue(historyEvent.eventDto, clazz)
}
Camunda работает через паттерн «Команда». Чтобы вызвать обработчик, нужно обернуть его в отдельную команду и вызывать ее выполнение через CommandExecutor.
class ProcessHistoryEventCommand(
    private val historyEvent: HistoryEvent,
    private val eventHandler: DbHistoryEventHandler
) : Command<Unit>, Serializable {
 
    override fun execute(
        commandContext: CommandContext
    ) {
        eventHandler.handleEvent(historyEvent)
    }
}
 
commandExecutor.execute(
    ProcessHistoryEventCommand(
       event,
       eventHandler
    )
)

Настройка редиректа запросов для REST API Camunda
На предыдущих этапах мы настроили экспорт данных и сохранение их в базу. Остается реализовать последнюю цель — ничего не менять для пользователей со стороны API.
Создаем еще один источник данных — отдельный сервис с историей. При этом нужно агрегировать данные из разных систем автоматически. С этой задачей справится почти каждый сервис API gateway. Нужно просто перенаправить исторические запросы из camunda_runtime в новый сервис camunda_history_collector — и тогда для пользователей ничего не изменится.

Вместо заключения
Вместо подведения итогов расскажу о преимуществах выбранного решения:
В основной БД camunda_runtime хранятся только runtime-данные. Благодаря этому снижается нагрузка на железо — процессор не обрабатывает исторические запросы, и объем занятого пространства на диске не возрастает.
Поддерживается агрегация истории в одном сервисе — camunda_history_collector для разных camunda_runtime.
В camunda_runtime можно управлять временем жизни для событий истории, но ttl задается глобально или на уровне определенного процесса. В camunda_history_collector можно указывать ttl для разных типов событий, когда не вся история нужна в дальнейшем. Например, историю выполнения job`ов хранить пять дней, а историю выполнения процесса — 30 дней.
Работоспобность camunda_history_collector не влияет на работу camunda_runtime.
Мы опубликовали нашу библиотеку на Github для написания делегатов в декларативном стиле camunda-delegator-lib. Она помогает писать меньше бойлерплейта и делает код более читаемым.
Если вы хотите задать вопрос или предложить тему для обсуждения — добро пожаловать в комментарии или в наш GitHub.