Привет! Меня зовут Андрей, я - ведущий разработчик в "РТК ИТ". В этой статье речь пойдет об опыте перехода на паттерн external task в одной из наших систем.
Camunda — это BPM-движок для автоматизации бизнес-процессов. Она представляет собой набор библиотек, которые и позволяют выполнять описанные процессы.
Как мы в проекте работаем с camunda
В задачи проекта входит разбор и маршрутизация заказов с вызовами разных внешних API. Camunda используется в проекте для запуска разных процессов, построения маршрутов процесса с установкой соответствующих статусов, обработкой ответов внешних API с разными кейсами и ожиданием в процессе изменений статусов заказа.
Первоначальная реализация проекта
message broker - очередь реализованная через RabbitMQ;
bpmn-launcher - сервис для работы с процессами (запуск и доставка сообщений конкретному выполнению, чтобы активировать существующую подписку на событие сообщения);
engine-rest - путь к приложению camunda по умолчанию;
business process - запускаемые bpmn процессы;
external system - api внешних систем.
Сервис “bpmn-launcher” который получает из очереди сообщение с id заказа, по данным из заказа определяем какой bpmn процесс нужно запускать, для этого используем метод
POST: /rest/process-definition/key/{key}/start для запуска нужного процесса
Ниже описан выдуманный процесс создания заказа, каждый call activity вызывал свой bpmn процесс
Сall activity (или повторно используемый подпроцесс) позволяет вызывать и другой процесс как часть этого процесса. Он похож на встроенный подпроцесс, но этот процесс является внешним (т. е. хранится как отдельный BPMN) и может вызываться различными процессами.
Script Task - задача сценария, используется для моделирования оценки сценария, например, сценарий написанный на Groovy или JavaScript
Service task - служебная задача представляет рабочий элемент в процессе определенного типа
С развитием проекта появилась необходимость уйти от взаимодействия между camunda и внешними API. Взаимодействие было реализовано с помощью синхронного вызова java-кода. Изначально, когда проектировали систему, общение с внешними API было реализовано через Service Task c Implementation type: Connector и Connector ID: http-connector в camunda с подготовкой тела запроса и разбора ответа с помощью js в Script Task в отдельно вызываемых call activity, и передачей необходимого метода и url.
Плюсы такого решения: быстрое применение изменений в bpmn и быстрый вывод процесса на стенд. Этот вариант хорошо подходит для тестирования новой системы - когда необходимо часто править тело запроса, когда часто меняются требования.
Проблемы с которыми столкнулись в процессе: сложность обработки ответов по API, сложная реализация повторных запросов при неуспешных ответах, необходимо было реализовывать отдельный call activity под каждый процесс и под каждый вызов.
Проанализировав возможные решения данных проблем, пришли к решению внедрять универсальный механизм external task, так как данное решение дает масштабируемую и отказоустойчивую архитектуру и даёт возможность делегировать работу в какой-либо внутренний сервис - адаптер.
External task - позволяет настроить задачи удаленного обслуживания для вашего рабочего процесса.
Внедрение external task в проект
message broker - очередь реализованная через RabbitMQ;
bpmn-launcher - сервис для работы с процессами (запуск и доставка сообщений конкретному выполнению, чтобы активировать существующую подписку на событие сообщения);
camunda-subscriber - сервис для работы с активными топиками в camunda с блокировкой, и отправкой сообщения в rabbit;
camunda-publisher - сервис для работы с заблокированными топиками (выполнение внешней задачи с обновлением переменных в процессе);
adapters - разные сервисы исполнители внешних задач с обращением во внешние системы;
engine-rest - путь к приложению camunda по умолчанию;
business process - запускаемые bpmn процессы;
external system - api внешних систем.
Для внедрения external task нам понадобилось реализовать сервисы которые общаются с camunda, camunda-subscriber и camunda-publisher, и внутренние сервисы-адаптеры для взаимодействия с внешними API.
Так же сделали универсальный вызываемый подпроцесс bpmn. В данный подпроцесс передается переменная, которая затем используется в поле topic у service task. Это позволяет нам не увеличивать количество вызываемых bpmn процессов, и настроить универсальную обработку вызываемого процесса.
Чтобы создать external task как в примере выше, необходимо:
Создать task.
Поменять его тип на service task.
Установить implementation на external.
Указать значение поля topic.
сamunda-subscriber постоянно использует метод POST /fetchAndLock, чтобы получить список задач по topicName и закрепить их за собой (так как может быть поднято несколько подов данного сервиса).
async def fetch_and_lock(self):
body = {
"workerId": self.worker_id,
"maxTasks": config.max_tasks,
"topics": self._get_topics(),
}
return await self.httpx_client.send_data('/fetchAndLock', body)
def _get_topics(self):
topics = []
for topic in config.topic_names:
topics.append({
"topicName": topic,
"lockDuration": self.config["lockDuration"], # How much time the worker thinks he needs to process the task
})
return topics
Затем сообщение отправляется в очередь для адаптеров с соответствующими id & workerId(данные атрибуты мы получаем в ответе от метода POST /fetchAndLock), а также с сопутствующими значениями из variables.
Дальше сообщение появляется в очереди для соответствующего адаптера, адаптер
вычитывает сообщение, собирает тело запроса, делает вызовы к внешним API и
обрабатывает ответ. По результатам ответа от внешней системы адаптер может положить сообщение с разными ключами для успешной или ошибочной обработки сообщения (complete или bpmnError), либо если внешняя система недоступна отложить сообщение обратно в очередь для повторного обращения во внешнюю систему через определенное время (настраиваемое в очереди, в нашем случае RabbitMQ).
После обработки на сервисе-адаптере, формируется сообщение для сервиса camunda-subscriber который от типа сообщения вызывает соответствующие методы POST /{id}/complete или POST /{id}/bpmnError, тем самым процесс продолжает движение до следующего call activity.
async def complete(self, event: Event):
body = {
"workerId": event.workerId,
"variables": Variables.format(event.variables),
}
return await self.httpx_client.send_data(f'/{event.taskId}/complete', body)
async def bpmn_failure(self, event: Event):
body = {
"workerId": event.workerId,
"errorCode": event.variables['error_code'],
"errorMessage": event.variables['error_message'],
"variables": Variables.format(event.variables),
}
return await self.httpx_client.send_data(f'/{event.taskId}/bpmnError', body)
А что с идемпотентностью?
При использовании POST: /rest/process-definition/key/{key}/start
возникает вопрос о транзакционных гарантиях, нужно предусмотреть корректность обработки ответа на случай сбоев или дублей.
Чтобы избежать повторный запуск bpmn процесса, мы делаем так: если сервис запуска bpmn процессов случайно получил дублированное сообщение - проверяем, не запущен ли уже процесс по данному id заказа.
Результат внедрения паттерна external task в проект с универсальным bpmn процессом:
архитектура системы проще масштабируется
повысилась отказоустойчивость
сервисы адаптеры можно реализовывать на разных языках программирования
стал чище git репозиторий.
На будущее
В планах минимизировать, либо отказаться от JavaScript кода в Script Tasks, и переложить данные задачи на внешние обработчики, что позволит писать чистый код на Python покрытый тестами, и минимизировать логику в коде Script Tasks
Как использовать паттерн external task описано в официальной документации:
https://camunda.com/blog/2015/11/external-tasks/
https://docs.camunda.org/manual/latest/user-guide/process-engine/external-tasks/
У camunda есть хорошее, многофункциональное API, официальная документация:
Комментарии (4)
atshaman
25.09.2023 17:45+1Угу. Тоже ходил этой дорожкой - делегат-коннектор-адаптер+таска. Правда когда количество адаптеров перевалило за два десятка с 95% общей логикой - появился сильный искус кидать данные по топикам кафки с валидацией схем и зацепить на это дело один единственный обработчик, который будет вычитывать топики, маппить имя топика-сервис и пихать данные по соответствующему адрес.
aborouhin
Я так понимаю, у Вас седьмая Камунда? Переход на восьмую (zeebe) так или иначе потребует вынесения всего во внешние таски.
Andrew_Yr Автор
Да, на данный момент седьмая, возможно в рамках импортозамещения перейдем на собственное решение автоматизированной системы управления бизнес-процессами "Поток"
mashkovd
"поток" - это опер сорс продукт? Есть ссылка на репу?