Допустим, необходимо запустить множество фоновых задач в .Net с возможностью в дальнейшем обратиться к их состоянию. Обращение к членам класса запущенной задачи может быть полезно, если необходимо однозначно определить текущее состояние объекта в момент выполнения.
Одно из частных решений — модель акторов
Модель акторов позволяет запускать фоновые задачи и обращаться к их текущему состоянию.
Интерфейсы
Определим интерфейс, который опишет выполняемую работу — IJob, с обязательной передачей в метод DoAsync
- CancellationToken token
.
public interface IJob<in TIn, out TOut>
where TIn : IJobInput
where TOut : IJobResult
{
Task<bool> DoAsync(TIn input, CancellationToken token);
/// <summary>
/// Describes the state of the fields of the IJob class that are changed by the DoAsync method.
/// </summary>
TOut GetCurrentState(Guid jobId);
}
Интерфейс джоба - IJob определен, но вызов его имплементации напрямую не даст искомых результатов, поэтому необходимо определить контекст выполнения джобов IJobContext, который объединит группу выполняемых однотипных задач. Здесь необходимо предоставить возможность не только запускать фоновые таски CreateJobAsync
, но и ожидать полного выполнения задачи DoJobAsync
.
public interface IJobContext<in TIn, TOut>
where TIn : IJobInput
where TOut : IJobResult
{
/// <summary>
/// Create a background job
/// </summary>
/// <returns>Job Id</returns>
Task<JobCreatedCommandResult> CreateJobAsync(TIn input,
int? maxNrOfRetries = null,
TimeSpan? minBackoff = null,
TimeSpan? maxBackoff = null,
Guid? jobId = null,
TimeSpan? timeout = null);
/// <summary>
/// Waiting for a response about the completion of the job
/// </summary>
Task<JobDoneCommandResult> DoJobAsync(TIn input,
int? maxNrOfRetries = null, TimeSpan? minBackoff = null, TimeSpan? maxBackoff = null, Guid? jobId = null, TimeSpan? timeout = null);
Task<StopJobCommandResult> StopJobAsync(Guid jobId, TimeSpan? timeout = null);
Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId, TimeSpan? timeout = null);
}
Реализация
Основной идеей модели акторов является изоляция состояния каждого объекта модели от внешнего вмешательства, что должно гарантировать предсказуемое изменение своего состояния. Общение между акторами происходит через очереди сообщений, в которые для каждого актора упорядочивают все входящие запросы, что дает последовательность выполнения команд и также позволяет избежать побочных эффектов влияния на члены класса актора.
Общая схема запросов будет выглядеть как MasterActor→ GroupActor → ManagerActor → WorkerActor
MasterActor
Определим точку входа в систему акторов — это будет MasterActor.
Его задачами является доставка сообщений до своих дочерних акторов и определение стратегии перезапуска подчиненных элементов в случае ошибки.
Для отправки сообщений из внешних источников, которые не находятся в системе акторов ActorSystem (здесь это JobContext) используется команда ожидания ответа - Ask.
await _masterActor.Ask<JobCreatedCommandResult>(command, currentTimeout);
Ask ожидает ответа от модели акторов JobCreatedCommandResult
, для заданного промежутка времени currentTimeout. Где команда имеет тип DoJobCommand
и принимается MasterActor методом Receive
, заданном в конструкторе.
DoJobCommandHandler
- метод, который передаст сообщение существующему групповому актору или создаст его новый экземпляр в случае отсутсвия с созранением ссылки IActorRef на него.
StopJobCommandHandler
- метод вызываемый в момент запроса на остановку воркера.RequestAllWorkersInfoQueryHandler
- хендлер запросов состояния группы акторов определенного типа IJob
internal class MasterActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
private readonly Dictionary<string, IActorRef> _groupIdToActor = new();
private readonly Dictionary<IActorRef, string> _actorToGroupId = new();
public MasterActor()
{
//Commands
Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
Receive<StopJobCommand>(StopJobCommandHandler);
//Queries
Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);
}
private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
{
if (_groupIdToActor.TryGetValue(doJobCommand.GroupName, out var actorRef))
{
actorRef.Forward(doJobCommand);
return;
}
var groupActorProps = DependencyResolver
.For(Context.System)
.Props<GroupActor<TIn,TOut>>();
var groupActor = Context
.ActorOf(groupActorProps, $"group-{doJobCommand.GroupName}");
Context.Watch(groupActor);
groupActor.Forward(doJobCommand);
_groupIdToActor.Add(doJobCommand.GroupName, groupActor);
_actorToGroupId.Add(groupActor, doJobCommand.GroupName);
}
private void StopJobCommandHandler(StopJobCommand command)
{
if (!_groupIdToActor.ContainsKey(command.GroupName))
{
Sender.Tell(new StopJobCommandResult(false, $"Group Actor list does not contain {command.GroupName}"));
return;
}
_groupIdToActor[command.GroupName].Forward(command);
}
...
GroupActor
DoJobCommandHandler должен создать дочерний актор GroupActor, который опишет группу выполняемых задач. Его функция проста, он должен передавать сообщения управленцам рабочих и сохранять ссылки на дочерние акторы для предоставления в дальнейшем возможности обратиться к их состоянию.
TrySaveWorkerActorRefCommand
- сообщение которое передается от workerActor для сохранения ссылки IActorRef в групповом акторе.ManagerActorTerminatedHandler
- вызывается после остановкиManagerActor
и удаляет ссылки на объектыIActorRef
дляManagerActor
иWorkerActor
.
internal class GroupActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
private string? _groupId;
private readonly Dictionary<Guid, IActorRef> _idToManagerActor = new();
private readonly Dictionary<IActorRef, Guid> _managerActorToId = new();
private readonly Dictionary<IActorRef, Guid> _workerActorToId = new();
private readonly Dictionary<Guid, IActorRef> _idToWorkerActor = new();
public GroupActor()
{
//Commands
Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
Receive<StopJobCommand>(StopJobCommandHandler);
//Queries
Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);
//Internal
Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
Receive<Terminated>(ManagerActorTerminatedHandler);
}
private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
{
if (_groupId != null && doJobCommand.GroupName != _groupId)
{
var message = "Ignoring Create Worker Actor";
Sender.Tell(doJobCommand.IsCreateCommand
? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
: new JobDoneCommandResult(false, message, doJobCommand.JobId));
return;
}
if (_idToManagerActor.ContainsKey(doJobCommand.JobId))
{
var message = $"{doJobCommand.JobId} Actor Exists.";
Sender.Tell(doJobCommand.IsCreateCommand
? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
: new JobDoneCommandResult(false, message, doJobCommand.JobId));
return;
}
_groupId ??= doJobCommand.GroupName;
var managerActorProps = DependencyResolver
.For(Context.System)
.Props<ManagerActor<TIn,TOut>>();
var managerActor = Context.ActorOf(managerActorProps,
$"manager-{doJobCommand.JobId}");
Context.Watch(managerActor);
_idToManagerActor.Add(doJobCommand.JobId, managerActor);
_managerActorToId.Add(managerActor, doJobCommand.JobId);
managerActor.Forward(doJobCommand);
}
private void ManagerActorTerminatedHandler(Terminated t)
{
var workerId = _managerActorToId[t.ActorRef];
_managerActorToId.Remove(t.ActorRef);
_idToManagerActor.Remove(workerId);
if (!_idToWorkerActor.TryGetValue(workerId, out var workerActorRef))
return;
_workerActorToId.Remove(workerActorRef);
_idToWorkerActor.Remove(workerId);
}
...
ManagerActor
С поведением ManagerActor дела обстоят чуть сложнее, ему необходимо определить стратегию поведения при возникновении ошибок у рабочего WorkerActor, и сохранить начальный запрос «WorkerDoJobCommand» для повторных отправок этого запроса в моменты перезапуска.
internal class ManagerActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
//Параметры стратегии перезапуска
private int _currentNrOfRetries;
private int _maxNrOfRetries;
private TimeSpan _minBackoff;
private TimeSpan _maxBackoff;
//Ссылка на отправитель команды на выполнение работы
private IActorRef? _doJobCommandSender;
//BackoffSupervisor
private IActorRef? _workerSupervisorActor;
//Команда для дочернего актора для повторной отправки в момент рестартов
private WorkerDoJobCommand<TIn>? _doJobCommand;
private Guid _jobId;
private bool _startedFlag;
//StopJobCommandHandler переданный от JobContext вызовет отмену задачи
private readonly CancellationTokenSource _cancellationTokenSource = new ();
private readonly ILogger<ManagerActor<TIn, TOut>> _logger;
public ManagerActor(ILogger<ManagerActor<TIn, TOut>> logger)
{
_logger = logger;
//Commands
Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
Receive<StopJobCommand>(StopJobCommandHandler);
//Queries
Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);
//Internal
Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
Receive<GiveMeWorkerDoJobCommand>(GiveMeWorkerDoJobCommandHandler);
Receive<Terminated>(WorkerActorTerminatedHandler);
}
...
DoJobCommandHandler
- только создает воркера без отправки сообщения на выполнение команды, и задает ему стратегию перезапуска, которая определена «BackoffSupervisor
» в момент создания WorkerActor. Перезапуск воркера будет произведен если возникнет ошибка в самом воркере и будет выполнен количество равноеmaxNrOfRetries
раз в интервале отminBackoff
доmaxBackoff
. Интервал необходим для того, чтобы повторные попытки выполнить работу в случае подключения к внешним ресурсам равномерно распределили запросы в заданный промежуток времени.GiveMeWorkerDoJobCommandHandler
- обработчик сообщенияGiveMeWorkerDoJobCommand
, которое присылает дочеркий воркер актор в момент инициализации своего состояния - это необходимо для перезапусков, так как если отправить сообщение в самомDoJobCommandHandler
, который выполняется один раз, то в момент падения воркер актора его работа не будет начата заново.
private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
{
if (_workerSupervisorActor != null)
{
var message = "Ignoring Create Worker Actor";
Sender.Tell(doJobCommand.IsCreateCommand
? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
: new JobDoneCommandResult(false, message, doJobCommand.JobId));
return;
}
_jobId = doJobCommand.JobId;
_maxNrOfRetries = doJobCommand.MaxNrOfRetries;
_minBackoff = doJobCommand.MinBackoff;
_maxBackoff = doJobCommand.MaxBackoff;
_doJobCommandSender = Sender;
var workerActorProps = DependencyResolver
.For(Context.System)
.Props<WorkerActor<TIn,TOut>>();
var supervisorOfWorkerActorProps = BackoffSupervisor.Props(
Backoff.OnFailure(
workerActorProps,
childName: $"worker-{doJobCommand.JobId}",
minBackoff: _minBackoff,
maxBackoff: _maxBackoff,
randomFactor: 0.2,
maxNrOfRetries: _maxNrOfRetries)
.WithSupervisorStrategy(new OneForOneStrategy(exception =>
{
if (exception is TaskCanceledException
|| exception.InnerException is TaskCanceledException
|| _currentNrOfRetries >= _maxNrOfRetries)
{
var text = $"BackoffSupervisor: jobId: {_jobId}" +
$" {exception?.Message}" +
$" InnerException: {exception?.InnerException?.Message}";
_logger.LogError(text);
_doJobCommandSender.Tell(new JobDoneCommandResult(false, text, _jobId));
return Directive.Stop;
}
_currentNrOfRetries += 1;
return Directive.Restart;
})));
_workerSupervisorActor = Context
.ActorOf(supervisorOfWorkerActorProps, $"supervisor-of-worker-{doJobCommand.JobId}");
Context.Watch(_workerSupervisorActor);
_doJobCommand = new WorkerDoJobCommand<TIn>(
doJobCommand.JobInput,
_doJobCommandSender,
doJobCommand.JobId,
_cancellationTokenSource,
doJobCommand.IsCreateCommand);
}
...
WorkerActor
Далее команда выполнения попадает к рабочему WorkerActor, который отправит сообщения о создании работы JobCreatedCommandResult
или о её завершении JobDoneCommandResult
в зависимости от изначально выбранного метода в IJobContext
CreateJobAsync
или DoJobAsync
соответственно.
По завершении работы WorkerActor положет себе за щеку смертельную пилюлю _self.Tell(PoisonPill.Instance)
, что приведет к его остановке и очистке ресурсов.
internal class WorkerActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
private Guid _jobId;
private readonly IServiceScope _scope;
private readonly IActorRef _self;
private IJob<TIn, TOut> _job;
public WorkerActor(IServiceProvider serviceProvider)
{
_self = Self;
_scope = serviceProvider.CreateScope();
//Commands
Receive<WorkerDoJobCommand<TIn>>((msg) =>
{
WorkerDoJobCommandHandlerAsync(msg).PipeTo(_self);
});
//Queries
Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);
//Internal
Receive<Status.Failure>(Failed);
Context.Parent.Tell(new GiveMeWorkerDoJobCommand());
}
private async Task WorkerDoJobCommandHandlerAsync(WorkerDoJobCommand<TIn> command)
{
_jobId = command.JobId;
Context.Parent.Tell(new TrySaveWorkerActorRefCommand(_self, _jobId, command.DoJobCommandSender));
if(command.IsCreateCommand)
command.DoJobCommandSender.Tell(new JobCreatedCommandResult(true, "", _jobId));
var token = command.CancellationTokenSource.Token;
var jobResult = await _job.DoAsync(command.JobInput, token);
if(token.IsCancellationRequested)
{
if(!command.IsCreateCommand)
command.DoJobCommandSender.Tell(new JobDoneCommandResult(false,
"Job was cancelled.",
command.JobId));
return;
}
if(!command.IsCreateCommand)
command.DoJobCommandSender.Tell(new JobDoneCommandResult(jobResult, "Ok", command.JobId));
_self.Tell(PoisonPill.Instance);
}
Возможность отмены задач
Предусмотрена возможность отмены задачи. ManagerActor в момент получения команды StopJobCommand на остановку, вызывает отмену своего _cancellationTokenSource.Cancel(),
кансел токен которого был прокинут в дочерний WorkerActor, и отправку ему смертельной пилюли _workerSupervisorActor.Tell(PoisonPill.Instance).
private void StopJobCommandHandler(StopJobCommand _)
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
_workerSupervisorActor.Tell(PoisonPill.Instance);
Sender.Tell(new StopJobCommandResult(true, "Ok"));
return;
}
Sender.Tell(new StopJobCommandResult(false, "Cancellation Requested Already."));
}
DI - Microsoft.Extensions.DependencyInjection
В WorkerActor инъектирован интерфейс IServiceProvider serviceProvider
, что позволяет управлять временем жизни скоупа _scope
от момента инициализации воркера до момента остановки актора и очистки ресурсов. Для джоба пользователя скоуп - это все, что находится внутри метода IJob.DoAsync
. Вызов _scope.Dispose(
) также вызовет методы Dispose
, если в имплементации IJob
будет также унаследован IDispoosable
интерфейс.
protected override void PreStart()
{
_job = _scope.ServiceProvider.GetService<IJob<TIn, TOut>>();
}
protected override void PostStop()
{
_scope.Dispose();
}
Запросы
JobContext позволяет запросить состояние акторов, которые были созданы для определенного типа IJob
.
public async Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId,
TimeSpan? timeout = null)
{
var currentTimeout = timeout ?? _defaultTimeout;
var query = new RequestAllWorkersInfo(requestId, GetGroupName(), currentTimeout);
RespondAllWorkersInfo<TOut> info = await _masterActor
.Ask<RespondAllWorkersInfo<TOut>>(query, currentTimeout);
return info.WorkersData;
}
Использование
Для регистрации необходимо при старте проекта вызвать методы
//Job library registration
builder.Services.AddScoped<IJob<ForEachJobInput, ForEachJobResult>, ForEachJob>();
builder.Services.AddJobContext();
Где ForEachJob
— это простая тестовая задача, которая перебирает значения от 0 до Count
c ожидаеним секунды между шагами. В данном тестовом джобе ForEachJob
отдельно унаследован интерфейс IDisposable
, что позволяет дополнительно определить механизм очистки ресурсов в момент остановки задачи или после её штатного завершения работы.
public class ForEachJob : IJob<ForEachJobInput, ForEachJobResult>, IDisposable
{
private int _currentState;
private readonly ILogger<ForEachJob> _logger;
//Здесь возможна инъекция любого зарегестрированного в приложении интерфейса
public ForEachJob(ILogger<ForEachJob> logger)
{
_logger = logger;
}
public async Task<bool> DoAsync(ForEachJobInput input, CancellationToken token)
{
foreach (var item in Enumerable.Range(0, input.Count))
{
if (token.IsCancellationRequested)
return false;
_currentState = item;
_logger.LogInformation(item.ToString());
await Task.Delay(1000, token);
}
return true;
}
public ForEachJobResult GetCurrentState(Guid jobId)
{
return new ForEachJobResult
{
Id = jobId,
Data = _currentState
};
}
public void Dispose()
{
_logger.LogInformation("Dispose.");
}
}
Вызов, например, из контроллера будет выглядить так:
public class ForEachJobController : ControllerBase
{
private readonly IJobContext<ForEachJobInput, ForEachJobResult> _jobContext;
public ForEachJobController(
IJobContext<ForEachJobInput, ForEachJobResult> jobContext)
{
_jobContext = jobContext;
}
[HttpPost]
[Route(nameof(CreateJob))]
public async Task<JobCreatedCommandResult> CreateJob([FromBody] ForEachJobInput input)
{
return await _jobContext.CreateJobAsync(input);
}
[HttpPost]
[Route(nameof(DoJob))]
public async Task<JobDoneCommandResult> DoJob([FromBody] ForEachJobInput input)
{
return await _jobContext.DoJobAsync(input);
}
[HttpPost]
[Route(nameof(StopJob))]
public async Task<StopJobCommandResult> StopJob([FromBody] Guid jobId)
{
return await _jobContext.StopJobAsync(jobId);
}
[HttpGet]
[Route(nameof(GetAllJobs))]
public async Task<ICollection<ForEachJobResult?>> GetAllJobs(
[FromQuery] int requestId)
{
var result = await _jobContext
.GetAllJobsCurrentStatesAsync(requestId);
return result.Values.Select(x => x.Result).ToList();
}
}
Итог
Akka один из возможных способов запуска отложенных задач с потенциальной возможностью кластеризации решения.
Комментарии (10)
MonkAlex
10.06.2023 02:34+5Постановка задачи непонятна.
Чего хотели добиться то?
Чтобы просто запустить кучу задач и знать статус их выполнения, достаточно сделать нужное количество
Task.Run(...)
и смотреть, неcompleted
ли каждая нужная из них. Положите в структуру\класс, для того чтобы поименовать или как-то ещё отметить.Dglkdpodg Автор
10.06.2023 02:34@MonkAlex, добрый день, задача состоит в создании интерфейса IJobContext, имплементация которого для метода DoAsync абстрагирует работу со скоупом Di, определит стратегию перезапуска задачи, в нужное время (
StopAsync
) самостоятельно отменитCancellationTokenSource
и даст возможность получить нетолько результат работы задачи, но и в момент ее выполнения даст возможность обращаться к ее текущему состоянию. Голый таск ран вполне может быть использован для некоторых отдельных функций из списка выше, но для всех разом - это уже будет какая-то другая имплементация требуемого списка функциональности из IJobContext не акке, а на таск ранеMonkAlex
10.06.2023 02:34+3Не всё понял в вашем ответе, но поясню, к чему был мой вопрос. Когда вы пишите статью, очень неплохо для начала объяснить, какие задачи решаете. Т.е. пояснить, что вот есть задача А, со своими особенностями. И вот такой инструмент как Б, позволяет написав вот такой код, получить нужный результат.
А вашу статю я читаю как "вот есть такой код, который позволяет получить что-то". В конце упомянута акка, что добавило мне понимания. Но всё ещё неясно, какую изначально задачу решаем.
Отдельно отмечу, что акка - это обычно что-то на тему concurrent & distributed, что в статье не вижу, может пропустил.
nronnie
10.06.2023 02:34Чего хотели добиться то?
Я так понял, что тут хотели показать примеры работы с каким-то фреймворком для подобных задач, но, как-то явно про это не написали, только ссылка в конце статьи.
RouR
10.06.2023 02:34+7Общая схема запросов будет выглядеть как MasterActor→ GroupActor → ManagerActor → WorkerActor
Это оверинжиниринг, на мой взгляд. Ещё и акторы зачем-то.
В .net есть BackgroundService и всё сводится к 3м сервисам:
Общая очередь (можно в памяти, синглтон или статик ConcurrentQueue) с данными для таски
BackgroundService который смотрит эту очередь и запускает таску
Сервис с бизнес-логикой самой таски. Паттерн стратегия.
Dglkdpodg Автор
10.06.2023 02:34@RouR , согласен, если просто фономо нужно что-то запустить, то BackgroundService - отличное решение
mvv-rus
10.06.2023 02:34Чтобы просто фоном запустить что-нибудь достаточно Task.Run, чтобы передать команду на завершение - передать в исполняемый через делегат метод CancellationToken а в коде метода - смотреть, что он не отменен, чтобы отслеживать custom-состояние - сделать в классе volatile поле (с доступом снаружи к нему напрямую или через свойство), имеющее тип, обновления которого атомарны (т.е., грубо говоря - не struct) и писать туда из метода состояние по мере выполнения - и так далее, по мере усложнения задачи.
Так что присоединяюсь к мнению@MonkAlex: код из вашей статьи решает не приведенную в начале текста простую задачу, а гораздо более сложную.
dyadyaSerezha
1) в мастеровом DoJobCommandHandler:
groupActor.Forward(doJobCommand); _groupIdToActor.Add(doJobCommand.GroupName, groupActor); _actorToGroupId.Add(groupActor, doJobCommand.GroupName);
Почему сначала вызывается метод Forward, а потом актор добавляется в словари? Методологически надо наоборот, а вызывать Forward вообще один раз для обеих ветвей (новый актор или старый).
2) в Итоге впервые, без объяснений встречается слово akka. Надо в начале хотя бы в одном предложении заметить, что показана работа библиотеки akka.
3) хорошо бы добавить обратные уведомления о прогрессе (проценте) выполненной задачи.
Dglkdpodg Автор
@dyadyaSerezha, 3 ий пункт выглядит, как очень даже интересная функциональность
dyadyaSerezha
А кстати, почему во всех словарях - groupId, а само свойство - groupName? Вот так, на мелочах, шпионы и прокалываются...)