Привет! Меня зовут Андрей, я - ведущий разработчик в "РТК ИТ". В этой статье речь пойдет об опыте перехода на паттерн external task в одной из наших систем.

Camunda — это BPM-движок для автоматизации бизнес-процессов. Она представляет собой набор библиотек, которые и позволяют выполнять описанные процессы.

Как мы в проекте работаем с camunda

В задачи проекта входит разбор и маршрутизация заказов с вызовами разных внешних API. Camunda используется в проекте для запуска разных процессов, построения маршрутов процесса с установкой соответствующих статусов, обработкой ответов внешних API с разными кейсами и ожиданием в процессе изменений статусов заказа.

Первоначальная реализация проекта

message broker - очередь реализованная через RabbitMQ;

bpmn-launcher - сервис для работы с процессами (запуск и доставка сообщений конкретному выполнению, чтобы активировать существующую подписку на событие сообщения);

engine-rest - путь к приложению camunda по умолчанию;

business process - запускаемые bpmn процессы;

external system - api внешних систем.

Сервис “bpmn-launcher” который получает из очереди сообщение с id заказа, по данным из заказа определяем какой bpmn процесс нужно запускать, для этого используем метод

POST: /rest/process-definition/key/{key}/start для запуска нужного процесса

Ниже описан выдуманный процесс создания заказа, каждый call activity вызывал свой bpmn процесс

Пример BPMN-схемы
Пример BPMN-схемы

Сall activity (или повторно используемый подпроцесс) позволяет вызывать и другой процесс как часть этого процесса. Он похож на встроенный подпроцесс, но этот процесс является внешним (т. е. хранится как отдельный BPMN) и может вызываться различными процессами.

Представление Сall activity
Представление Сall activity
Пример BPMN-схемы для call activity “Проверить email в SSO”
Пример BPMN-схемы для call activity “Проверить email в SSO”
Пример BPMN-схемы для call activity “Создать заказ”
Пример BPMN-схемы для call activity “Создать заказ”

Script Task - задача сценария, используется для моделирования оценки сценария, например, сценарий написанный на Groovy или JavaScript

Script Task
Script Task

Service task - служебная задача представляет рабочий элемент в процессе определенного типа

Пример BPMN-схемы из Service task
Пример BPMN-схемы из Service task

С развитием проекта появилась необходимость уйти от взаимодействия между camunda и внешними API. Взаимодействие было реализовано  с помощью синхронного вызова java-кода. Изначально, когда проектировали систему, общение с внешними API было реализовано через Service Task c Implementation type: Connector и Connector ID: http-connector в camunda с подготовкой тела запроса и разбора ответа с помощью js в Script Task в отдельно вызываемых call activity, и передачей необходимого метода и url.

Окно настроек в Camunda Modeler
Окно настроек в Camunda Modeler

Плюсы такого решения: быстрое применение изменений в bpmn и быстрый вывод процесса на стенд. Этот вариант хорошо подходит для тестирования новой системы - когда необходимо часто править тело запроса, когда часто меняются требования.

Проблемы с которыми столкнулись в процессе: сложность обработки ответов по API, сложная реализация повторных запросов при неуспешных ответах, необходимо было реализовывать отдельный call activity под каждый процесс и под каждый вызов.

Проанализировав возможные решения данных проблем, пришли к решению внедрять универсальный механизм external task, так как данное решение дает масштабируемую и отказоустойчивую архитектуру и даёт возможность делегировать работу в какой-либо внутренний сервис - адаптер.

External task - позволяет настроить задачи удаленного обслуживания для вашего рабочего процесса.

Внедрение external task в проект

message broker - очередь реализованная через RabbitMQ;

bpmn-launcher - сервис для работы с процессами (запуск и доставка сообщений конкретному выполнению, чтобы активировать существующую подписку на событие сообщения);

camunda-subscriber - сервис для работы с активными топиками в camunda с блокировкой, и отправкой сообщения в rabbit;

camunda-publisher - сервис для работы с заблокированными топиками (выполнение внешней задачи с обновлением переменных в процессе);

adapters - разные сервисы исполнители внешних задач с обращением во внешние системы;

engine-rest - путь к приложению camunda по умолчанию;

business process - запускаемые bpmn процессы;

external system - api внешних систем.

Для внедрения external task нам понадобилось реализовать сервисы которые общаются с camunda, camunda-subscriber и camunda-publisher, и внутренние сервисы-адаптеры для взаимодействия с внешними API.

Так же сделали универсальный вызываемый подпроцесс bpmn. В данный подпроцесс передается переменная, которая затем используется в поле topic у service task. Это позволяет нам не увеличивать количество вызываемых bpmn процессов, и настроить универсальную обработку вызываемого процесса.

Чтобы создать external task как в примере выше, необходимо:

  1. Создать task.

  2. Поменять его тип на service task.

  3. Установить implementation на external.

  4. Указать значение поля topic.

Окно настроек в Camunda Modeler
Окно настроек в Camunda Modeler

сamunda-subscriber постоянно использует метод POST  /fetchAndLock, чтобы получить список задач по topicName и закрепить их за собой (так как может быть поднято несколько подов данного сервиса).

async def fetch_and_lock(self):
   body = {
       "workerId": self.worker_id,
       "maxTasks": config.max_tasks,
       "topics": self._get_topics(),
   }


   return await self.httpx_client.send_data('/fetchAndLock', body)


def _get_topics(self):
   topics = []
   for topic in config.topic_names:
       topics.append({
           "topicName": topic,
           "lockDuration": self.config["lockDuration"], # How much time the worker thinks he needs to process the task
       })
   return topics

Затем сообщение отправляется в очередь для адаптеров с соответствующими id & workerId(данные атрибуты мы получаем в ответе от метода POST /fetchAndLock), а также с сопутствующими значениями из variables.

Дальше сообщение появляется в очереди для соответствующего адаптера, адаптер 

вычитывает сообщение, собирает тело запроса, делает вызовы к внешним API и 

обрабатывает ответ. По результатам ответа от внешней системы адаптер может положить сообщение с разными ключами для успешной или ошибочной обработки сообщения (complete или bpmnError), либо если внешняя система недоступна отложить сообщение обратно в очередь для повторного обращения во внешнюю систему через определенное время (настраиваемое в очереди, в нашем случае RabbitMQ).

После обработки на сервисе-адаптере, формируется сообщение для сервиса camunda-subscriber который от типа сообщения вызывает соответствующие методы POST /{id}/complete или POST /{id}/bpmnError, тем самым процесс продолжает движение до следующего call activity.

async def complete(self, event: Event):
   body = {
       "workerId": event.workerId,
       "variables": Variables.format(event.variables),
   }


   return await self.httpx_client.send_data(f'/{event.taskId}/complete', body)

async def bpmn_failure(self, event: Event):
   body = {
       "workerId": event.workerId,
       "errorCode": event.variables['error_code'],
       "errorMessage": event.variables['error_message'],
       "variables": Variables.format(event.variables),
   }


   return await self.httpx_client.send_data(f'/{event.taskId}/bpmnError', body)

А что с идемпотентностью?

При использовании  POST: /rest/process-definition/key/{key}/start
возникает вопрос о транзакционных гарантиях, нужно предусмотреть корректность обработки ответа на случай сбоев или дублей.

Чтобы избежать повторный запуск bpmn процесса, мы делаем так: если сервис запуска bpmn процессов случайно получил дублированное сообщение - проверяем, не запущен ли уже процесс по данному id заказа. 

Результат внедрения паттерна external task в проект с универсальным bpmn процессом:

  • архитектура системы проще масштабируется

  • повысилась отказоустойчивость

  • сервисы адаптеры можно реализовывать на разных языках программирования

  • стал чище git репозиторий.

На будущее

В планах минимизировать, либо отказаться от JavaScript кода в Script Tasks, и переложить данные задачи на внешние обработчики, что позволит писать чистый код на Python покрытый тестами, и минимизировать логику в коде Script Tasks


Как использовать паттерн external task описано в официальной документации:

https://camunda.com/blog/2015/11/external-tasks/

https://docs.camunda.org/manual/latest/user-guide/process-engine/external-tasks/

У camunda есть хорошее, многофункциональное API, официальная документация:

https://docs.camunda.org/manual/7.7/reference/rest/

Комментарии (4)


  1. aborouhin
    25.09.2023 17:45

    Я так понимаю, у Вас седьмая Камунда? Переход на восьмую (zeebe) так или иначе потребует вынесения всего во внешние таски.


    1. Andrew_Yr Автор
      25.09.2023 17:45
      +1

      Да, на данный момент седьмая, возможно в рамках импортозамещения перейдем на собственное решение автоматизированной системы управления бизнес-процессами "Поток"


      1. mashkovd
        25.09.2023 17:45

        "поток" - это опер сорс продукт? Есть ссылка на репу?


  1. atshaman
    25.09.2023 17:45
    +1

    Угу. Тоже ходил этой дорожкой - делегат-коннектор-адаптер+таска. Правда когда количество адаптеров перевалило за два десятка с 95% общей логикой - появился сильный искус кидать данные по топикам кафки с валидацией схем и зацепить на это дело один единственный обработчик, который будет вычитывать топики, маппить имя топика-сервис и пихать данные по соответствующему адрес.