В Тинькофф для разработки систем автоматизации бизнес-процессов мы используем фреймворк Camunda + Spring. Сами бизнес-процессы описываем с помощью BPMN (Business Process Management Notation) в виде блок-схем.
Наиболее часто используемый элемент на наших схемах — service tasks (прямоугольник с шестеренкой). Camunda поддерживает два способа выполнения service tasks:
- С помощью синхронного вызова java-кода.
- Создание external task.
Второй способ позволяет выполнять задачи с помощью внешних систем — например, если нужно вызвать одно camunda-приложение из другого или вообще делегировать работу в какую-либо внешнюю систему.
Пример BPMN-схемы
Это полезно, когда вы собираетесь переиспользовать логику в нескольких приложениях. Или когда вы хотите придерживаться микросервисной архитектуры. Например, отделяя сервисы, которые занимаются бизнес-процессами, от сервисов, которые реализуют технические задачи, такие как генерация отчетов или рассылка.
Вместе с этой возможностью external task дает масштабируемую, отказоустойчивую архитектуру. Чтобы понять, за счет чего это происходит, сначала нужно понять, как external task работает на уровне BPMN и на уровне приложения.
External task in BPMN
External task подразумевает создание задачи, которая может быть выполнена внешним обработчиком. Суть паттерна external task заключается в том, что:
- Процесс, который «заказывает» выполнение задачи, просто добавляет в БД свой «заказ».
- Некий абстрактный обработчик запрашивает у camunda задачи на обработку, при этом закрепляя задачу за собой так, чтобы ее не мог выполнить другой обработчик.
- После выполнения задачи обработчик сообщает camunda результат выполнения (успешный/неуспешный).
На схеме выше я описал выдуманный процесс, в котором мы хотим получить список пользователей, отправить им рекламу и через 2 часа посчитать количество заявок после маркетинговой рассылки. И, если заявок больше 10, увеличить выборку для следующей рассылки.
Я хочу, чтобы мое приложение на camunda отвечало только за бизнес-процессы, а email-рассылками занималось любое другое приложение. В таком случае мне отлично подходит паттерн external task. В своем процессе я просто создам задачу на имейл-рассылку и буду ждать, когда ее выполнит какой-нибудь внешний обработчик.
Чтобы на схеме создать external task, необходимо:
- Создать обычный task.
- Поменять его тип на service task.
- Установить implementation на external.
- Указать значение поля Topic.
Topic — это название очереди, в которую будут складываться задачи одного типа и на которую будет подписываться внешний обработчик.
Теперь, когда в процессе есть external task, можно его запустить, но выполняться он не будет, так как никто его не обрабатывает.
External tasks worker
Паттерн external task хорош тем, что он позволяет реализовывать обработку задач на любом языке, с помощью любых инструментов, которые могут выполнять HTTP-запросы.
Ниже приведен пример из блога camunda. В примере реализован внешний обработчик на javascript, который раз в 20 секунд запрашивает у camunda список задач на обработку. Если есть задачи, то выполняет по ним рассылку и уведомляет camunda о завершении задачи.
const baseUrl = 'http://localhost:8080/my-app/rest';
const workerSettings = {
workerId: 'worker01', // some unique name for the current worker instance
maxTasks: 5,
topics: [
{
topicName: 'sendEmail',
lockDuration: 10000, // How much time the worker thinks he needs to process the task
variables: ['video'] // Which variables should be returned in the response (to avoid additional REST calls to read data)
}]};
const requestParams = {method: 'POST', headers: {contentType: 'application/json'}};
function pollExternalTasks() {
return fetch(`${baseUrl}/external-task/fetchAndLock`, {
...requestParams,
body: JSON.stringify(workerSettings)
})
}
function processExternalTask(result = []) {
return Promise.all(result.map(externalTask => {
sendEmail(externalTask); // Here the actual work would be done
return fetch(`${baseUrl}/external-task/${externalTask.id}/complete`, {
...requestParams,
body: JSON.stringify({workerId: workerSettings.workerId}),
})
}));
}
setInterval(() => {
pollExternalTasks().then(processExternalTask)
}, 20000);
Как видно из кода выше, ключевыми методами для обработки external tasks являются fetchAndLock и complete. Первый метод запрашивает список задач и закрепляет их выполнение за собой, а второй информирует об окончании выполнения задачи. Кроме этих двух методов есть и другие, о них вы можете прочитать в официальной документации.
Camunda external task client
Для реализации обработки external tasks camunda предоставила клиенты на Javascript и Java, которые позволяют создавать обработчики внешних задач буквально в несколько строк. Еще есть подробный гайд, в котором описаны основные принципы обработки внешних задач — опять-таки с примерами на Javascript и Java.
Пример реализации внешнего обработчика с помощью ExternalTaskClient:
public class App {
public static void main(String... args) {
// bootstrap the client
ExternalTaskClient client = ExternalTaskClient.create()
.baseUrl("http://localhost:8080/engine-rest")
.asyncResponseTimeout(1000)
.build();
// subscribe to the topic
client.subscribe("sendEmail").handler((externalTask, externalTaskService) -> {
try {
String result = sendEmail(externalTask)
Map<String, Object> variables = new HashMap<>();
variables.put("result", result);
externalTaskService.complete(externalTask, variables);
System.out.println("The External Task " + externalTask.getId() + " has been completed!");
} catch (e: Exception) {
externalTaskService.handleFailure(externalTask, e.message, e.stackTrace.toString())
}
}).open();
}
}
Если ваша задача требует не просто выполнения какого-то синхронного действия, а запуска целого процесса, то вы вполне можете это сделать, например, запустив процесс через RuntimeService:
@Service
class EmailWorker(
private val runtimeService: RuntimeService
) {
val builder = ExternalTaskClientBuilderImpl().baseUrl("http://localhost:8080").workerId("myWorker")
val taskClient = builder.build()
val engineClient = (builder as ExternalTaskClientBuilderImpl).engineClient
@PostConstruct
fun init() {
taskClient
.subscribe("sendEmail")
.lockDuration(10000)
.handler { externalTask, externalService ->
runtimeService.startProcessInstanceByKey(
"SendEmailProcess",
externalTask.getVariable("emailId"),
mapOf(
"text" to externalTask.getVariable("text"),
"email" to externalTask.getVariable("email")
)
)
}
.open()
}
@PreDestroy
fun destroy() {
taskClient.stop()
}
}
// Delegate from SendEmailProcess process
@Component
class EmailResultDelegate(private val emailWorker: EmailWorker) {
fun doExecute(execution: DelegateExecution) {
emailWorker.engineClient.complete(
execution.readVar(EXTERNAL_TASK_ID),
mapOf("result" to "Success")
)
}
}
В этом примере обработчик external tasks (EmailWorker) при получении задачи запускает процесс SendEmailProcess.
Представим, что этот процесс выполняет какие-то действия, необходимые для отправки рассылки, и в конце вызывает EmailResultDelegate, который, в свою очередь, завершает выполнение external task.
Архитектурные преимущества external task
Стоит отметить, что есть способ запускать процесс в другом приложении camunda более простым способом: POST: /rest/process-definition/key/${id}/start
Когда вы используете REST, у вас нет никаких транзакционных гарантий. Но ведь с external task мы тоже работаем посредством REST, в чем тогда разница?
Разница в том, что мы не вызываем внешний сервис напрямую, а лишь публикуем задачи, которые могут быть обработаны. Рассмотрим на примере:
Некоторый внешний обработчик забирает задачу, которая теперь закреплена за ним, но при получении ответа происходит разрыв соединения. Теперь на стороне camunda заблокирована задача, которая не будет обработана, так как внешний обработчик не получил ответ. Но это не страшно: в camunda для external tasks есть тайм-аут, по которому задача снова вернется в очередь, и ее сможет обработать кто-нибудь другой.
Теперь давайте рассмотрим случай, когда внешний обработчик получил задачу, выполнил ее и вызвал метод complete, который завершился ошибкой из-за проблем сети. Теперь вы не сможете понять, была ли задача успешно завершена в camunda или нет. Вы можете попробовать снова, но есть вероятность, что проблемы с сетью будут продолжаться.
В этом случае лучшим решением будет игнорировать проблему. Если задача все-таки была успешно выполнена, то все в порядке. Если нет — по истечении тайм-аута задача снова будет доступна для обработки. Но это означает, что ваш внешний обработчик должен быть идемпотентным или содержать логику для дедупликации задач.
Аналогичная проблема может случиться при запуске нового процесса, поэтому перед этим стоит проверить существующие инстансы с такими же данными, например businessKey.
Помимо высокой отказоустойчивости external task позволяет легко масштабировать внешние обработчики и реализовывать их на любых языках программирования. При этом паттерн помогает реализовывать микросервисы так, чтобы они как можно меньше влияли друг на друга, тем самым повышая их стабильность.
Подробнее про external task:
https://docs.camunda.org/manual/latest/user-guide/process-engine/external-tasks/
https://docs.camunda.org/manual/latest/reference/rest/external-task/
https://docs.camunda.org/manual/latest/user-guide/ext-client/
potop
А пробовали в ваших процессах пускать задачи параллельно при помощи ParallelGateway?
Если да, то как вы потом пишете условия в ConditionalGateway, если параллельные задачи кладут результат в одинаковую переменную «result»?
Shoom3301 Автор
Возможно вы имели ввиду parallel multi instance?
Если так, то да, в моей задаче как раз пришлось это применять.
В процессе у меня есть список сущностей, которые нужно обработать внешним обработчиком.
Для этого я создал parallel multi instance external task и указал у него параметры Collection и Element variable. Collection — в данном случае, это переменная в контексте инстанса, которая хранит массив id пользователей. Element variable — название переменной в контексте внешней задачи (ее получит внешний обработчик).
При запуске процесса создается столько external tasks сколько элементов в у казанном Collection.
Ну и теперь собственно ответ на ваш вопрос.
Когда внешний обработчик выполнил задачу, он записывает в ответ переменную вида «result[$userId]». userId — это тот самый Element variable, который в моем случае является уникальным. Таким образом, на каждый результат external task создается уникальная переменная в контексте инстанса, ничего умнее я к сожалению не придумал :(
potop
Имелась ввиду похожая, но немного другая штука:
docs.camunda.org/manual/7.12/reference/bpmn20/gateways/parallel-gateway
Параллельно могут пускаться не разные инстансы одной таски, а произвольные таски.
Если все они пишут в переменную «result», то проблема в итоге такая же, как вы решаете при помощи коллекции, только индексом приходится брать не userId, а activityInstanceId, чтобы совсем надёжно было, и они не перезаписывали друг-друга.
Но хочется более простого решения, потому что если потом хочется кондишн запилить на основе результатов выполнения этих параллельных задач, то заранее неизвестно, какие будут индексы и не очень понятно, что там писать.