Допустим, необходимо запустить множество фоновых задач в .Net с возможностью в дальнейшем обратиться к их состоянию. Обращение к членам класса запущенной задачи может быть полезно, если необходимо однозначно определить текущее состояние объекта в момент выполнения.

Одно из частных решений — модель акторов

Модель акторов позволяет запускать фоновые задачи и обращаться к их текущему состоянию.

Интерфейсы

Определим интерфейс, который опишет выполняемую работу — IJob, с обязательной передачей в метод DoAsync - CancellationToken token.

public interface IJob<in TIn, out TOut>
    where TIn : IJobInput
    where TOut : IJobResult  
{
    Task<bool> DoAsync(TIn input, CancellationToken token);

    /// <summary>
    /// Describes the state of the fields of the IJob class that are changed by the DoAsync method.
    /// </summary>
    TOut GetCurrentState(Guid jobId);
}

Интерфейс джоба - IJob определен, но вызов его имплементации напрямую не даст искомых результатов, поэтому необходимо определить контекст выполнения джобов IJobContext, который объединит группу выполняемых однотипных задач. Здесь необходимо предоставить возможность не только запускать фоновые таски CreateJobAsync, но и ожидать полного выполнения задачи DoJobAsync.

public interface IJobContext<in TIn, TOut>
    where TIn : IJobInput
    where TOut : IJobResult
{

    /// <summary>
    /// Create a background job
    /// </summary>
    /// <returns>Job Id</returns>
    Task<JobCreatedCommandResult> CreateJobAsync(TIn input,
        int? maxNrOfRetries = null,
        TimeSpan? minBackoff = null,
        TimeSpan? maxBackoff = null,  
        Guid? jobId = null,
        TimeSpan? timeout = null);

    /// <summary>
    /// Waiting for a response about the completion of the job
    /// </summary>
    Task<JobDoneCommandResult> DoJobAsync(TIn input,
        int? maxNrOfRetries = null, TimeSpan? minBackoff = null, TimeSpan? maxBackoff = null,  Guid? jobId = null, TimeSpan? timeout = null);

    Task<StopJobCommandResult> StopJobAsync(Guid jobId, TimeSpan? timeout = null);

    Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId, TimeSpan? timeout = null);
}

Реализация

Основной идеей модели акторов является изоляция состояния каждого объекта модели от внешнего вмешательства, что должно гарантировать предсказуемое изменение своего состояния. Общение между акторами происходит через очереди сообщений, в которые для каждого актора упорядочивают все входящие запросы, что дает последовательность выполнения команд и также позволяет избежать побочных эффектов влияния на члены класса актора.

Общая схема запросов будет выглядеть как MasterActorGroupActorManagerActorWorkerActor

MasterActor

Определим точку входа в систему акторов — это будет MasterActor.

Его задачами является доставка сообщений до своих дочерних акторов и определение стратегии перезапуска подчиненных элементов в случае ошибки.

Для отправки сообщений из внешних источников, которые не находятся в системе акторов ActorSystem (здесь это JobContext) используется команда ожидания ответа - Ask.

await _masterActor.Ask<JobCreatedCommandResult>(command, currentTimeout);

Ask ожидает ответа от модели акторов JobCreatedCommandResult, для заданного промежутка времени currentTimeout. Где команда имеет тип DoJobCommand и принимается MasterActor методом Receive, заданном в конструкторе.

  • DoJobCommandHandler - метод, который передаст сообщение существующему групповому актору или создаст его новый экземпляр в случае отсутсвия с созранением ссылки IActorRef на него.

  • StopJobCommandHandler - метод вызываемый в момент запроса на остановку воркера.

  • RequestAllWorkersInfoQueryHandler - хендлер запросов состояния группы акторов определенного типа IJob

internal class MasterActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{

    private readonly Dictionary<string, IActorRef> _groupIdToActor = new();
    private readonly Dictionary<IActorRef, string> _actorToGroupId = new();

    public MasterActor()
    {
        //Commands
        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries
        Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);

    }


    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {

        if (_groupIdToActor.TryGetValue(doJobCommand.GroupName, out var actorRef))
        {
            actorRef.Forward(doJobCommand);
            return;
        }

        var groupActorProps = DependencyResolver
            .For(Context.System)
            .Props<GroupActor<TIn,TOut>>();

        var groupActor = Context
            .ActorOf(groupActorProps, $"group-{doJobCommand.GroupName}");

        Context.Watch(groupActor);

        groupActor.Forward(doJobCommand);

        _groupIdToActor.Add(doJobCommand.GroupName, groupActor);
        _actorToGroupId.Add(groupActor, doJobCommand.GroupName);

    }

    private void StopJobCommandHandler(StopJobCommand command)
    {
        if (!_groupIdToActor.ContainsKey(command.GroupName))
        {
            Sender.Tell(new StopJobCommandResult(false, $"Group Actor list does not contain {command.GroupName}"));
            return;
        }

        _groupIdToActor[command.GroupName].Forward(command);
    }
...

GroupActor

DoJobCommandHandler должен создать дочерний актор GroupActor, который опишет группу выполняемых задач. Его функция проста, он должен передавать сообщения управленцам рабочих и сохранять ссылки на дочерние акторы для предоставления в дальнейшем возможности обратиться к их состоянию.

  • TrySaveWorkerActorRefCommand - сообщение которое передается от workerActor для сохранения ссылки IActorRef в групповом акторе.

  • ManagerActorTerminatedHandler - вызывается после остановки ManagerActor и удаляет ссылки на объекты IActorRef для ManagerActor и WorkerActor.

internal class GroupActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{

    private string? _groupId;

    private readonly Dictionary<Guid, IActorRef> _idToManagerActor = new();
    private readonly Dictionary<IActorRef, Guid> _managerActorToId = new();

    private readonly Dictionary<IActorRef, Guid> _workerActorToId = new();
    private readonly Dictionary<Guid, IActorRef> _idToWorkerActor = new();

 
    public GroupActor()
    {
        //Commands
        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries
        Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);

        //Internal
        Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
        Receive<Terminated>(ManagerActorTerminatedHandler);

    }

    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {
        if (_groupId != null && doJobCommand.GroupName != _groupId)
        {
            var message = "Ignoring Create Worker Actor";
            Sender.Tell(doJobCommand.IsCreateCommand
                    ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                    : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }

        if (_idToManagerActor.ContainsKey(doJobCommand.JobId))
        {
            var message = $"{doJobCommand.JobId} Actor Exists.";
            Sender.Tell(doJobCommand.IsCreateCommand
                ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }

        _groupId ??= doJobCommand.GroupName;

        var managerActorProps = DependencyResolver
            .For(Context.System)
            .Props<ManagerActor<TIn,TOut>>();
        
        var managerActor = Context.ActorOf(managerActorProps,
            $"manager-{doJobCommand.JobId}");

        Context.Watch(managerActor);

        _idToManagerActor.Add(doJobCommand.JobId, managerActor);
        _managerActorToId.Add(managerActor, doJobCommand.JobId);
        managerActor.Forward(doJobCommand);
    }

    private void ManagerActorTerminatedHandler(Terminated t)
    {
        var workerId = _managerActorToId[t.ActorRef];
        _managerActorToId.Remove(t.ActorRef);
        _idToManagerActor.Remove(workerId);

        if (!_idToWorkerActor.TryGetValue(workerId, out var workerActorRef))
            return;
        _workerActorToId.Remove(workerActorRef);
        _idToWorkerActor.Remove(workerId);
    }
...

ManagerActor

С поведением ManagerActor дела обстоят чуть сложнее, ему необходимо определить стратегию поведения при возникновении ошибок у рабочего WorkerActor, и сохранить начальный запрос «WorkerDoJobCommand» для повторных отправок этого запроса в моменты перезапуска.

internal class ManagerActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult

{
    //Параметры стратегии перезапуска
    private int _currentNrOfRetries;
    private int _maxNrOfRetries;
    private TimeSpan _minBackoff;
    private TimeSpan _maxBackoff;

    //Ссылка на отправитель команды на выполнение работы
    private IActorRef? _doJobCommandSender;
    //BackoffSupervisor
    private IActorRef? _workerSupervisorActor;
    //Команда для дочернего актора для повторной отправки в момент рестартов
    private WorkerDoJobCommand<TIn>? _doJobCommand;

    private Guid _jobId;
    private bool _startedFlag;

    //StopJobCommandHandler переданный от JobContext вызовет отмену задачи
    private readonly CancellationTokenSource _cancellationTokenSource = new ();

    private readonly ILogger<ManagerActor<TIn, TOut>> _logger;

    public ManagerActor(ILogger<ManagerActor<TIn, TOut>> logger)
    {
        _logger = logger;

        //Commands

        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries

        Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);

        //Internal
        Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
        Receive<GiveMeWorkerDoJobCommand>(GiveMeWorkerDoJobCommandHandler);
        Receive<Terminated>(WorkerActorTerminatedHandler);

    }

 ...
  • DoJobCommandHandler - только создает воркера без отправки сообщения на выполнение команды, и задает ему стратегию перезапуска, которая определена «BackoffSupervisor» в момент создания WorkerActor. Перезапуск воркера будет произведен если возникнет ошибка в самом воркере и будет выполнен количество равное maxNrOfRetries раз в интервале от minBackoff до maxBackoff. Интервал необходим для того, чтобы повторные попытки выполнить работу в случае подключения к внешним ресурсам равномерно распределили запросы в заданный промежуток времени.

  • GiveMeWorkerDoJobCommandHandler - обработчик сообщения GiveMeWorkerDoJobCommand, которое присылает дочеркий воркер актор в момент инициализации своего состояния - это необходимо для перезапусков, так как если отправить сообщение в самом DoJobCommandHandler, который выполняется один раз, то в момент падения воркер актора его работа не будет начата заново.

    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {
        if (_workerSupervisorActor != null)
        {
            var message = "Ignoring Create Worker Actor";
            Sender.Tell(doJobCommand.IsCreateCommand
                ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }


        _jobId = doJobCommand.JobId;
        _maxNrOfRetries = doJobCommand.MaxNrOfRetries;
        _minBackoff = doJobCommand.MinBackoff;
        _maxBackoff = doJobCommand.MaxBackoff;
        _doJobCommandSender = Sender;

        var workerActorProps = DependencyResolver
            .For(Context.System)
            .Props<WorkerActor<TIn,TOut>>();

        var supervisorOfWorkerActorProps = BackoffSupervisor.Props(
            Backoff.OnFailure(
                    workerActorProps,
                    childName: $"worker-{doJobCommand.JobId}",
                    minBackoff: _minBackoff,
                    maxBackoff: _maxBackoff,
                    randomFactor: 0.2,
                    maxNrOfRetries: _maxNrOfRetries)
                .WithSupervisorStrategy(new OneForOneStrategy(exception =>
                {
                    if (exception is TaskCanceledException  
                        || exception.InnerException is TaskCanceledException
                        || _currentNrOfRetries >= _maxNrOfRetries)
                    {
                        var text = $"BackoffSupervisor: jobId: {_jobId}" +
                                   $" {exception?.Message}" +
                                   $" InnerException: {exception?.InnerException?.Message}";
                        _logger.LogError(text);
                        _doJobCommandSender.Tell(new JobDoneCommandResult(false, text, _jobId));
                        return Directive.Stop;
                    }
                  
                    _currentNrOfRetries += 1;
                    return Directive.Restart;
                })));

        _workerSupervisorActor = Context
            .ActorOf(supervisorOfWorkerActorProps, $"supervisor-of-worker-{doJobCommand.JobId}");

        Context.Watch(_workerSupervisorActor);
      
        _doJobCommand = new WorkerDoJobCommand<TIn>(
            doJobCommand.JobInput,
            _doJobCommandSender,  
            doJobCommand.JobId,
            _cancellationTokenSource,
            doJobCommand.IsCreateCommand);
    }
...

WorkerActor

Далее команда выполнения попадает к рабочему WorkerActor, который отправит сообщения о создании работы JobCreatedCommandResult или о её завершении JobDoneCommandResult в зависимости от изначально выбранного метода в IJobContext CreateJobAsync или DoJobAsync соответственно.

По завершении работы WorkerActor положет себе за щеку смертельную пилюлю _self.Tell(PoisonPill.Instance), что приведет к его остановке и очистке ресурсов.

internal class WorkerActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{
    private Guid _jobId;

    private readonly IServiceScope _scope;
    private readonly IActorRef _self;
    private IJob<TIn, TOut> _job;

 
    public WorkerActor(IServiceProvider serviceProvider)
    {
        _self = Self;
        _scope = serviceProvider.CreateScope();

        //Commands
        Receive<WorkerDoJobCommand<TIn>>((msg) =>
        {
            WorkerDoJobCommandHandlerAsync(msg).PipeTo(_self);
        });

        //Queries
        Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);

        //Internal
        Receive<Status.Failure>(Failed);
        Context.Parent.Tell(new GiveMeWorkerDoJobCommand());

    }

    private async Task WorkerDoJobCommandHandlerAsync(WorkerDoJobCommand<TIn> command)
    {
        _jobId = command.JobId;

        Context.Parent.Tell(new TrySaveWorkerActorRefCommand(_self, _jobId, command.DoJobCommandSender));

        if(command.IsCreateCommand)
            command.DoJobCommandSender.Tell(new JobCreatedCommandResult(true, "", _jobId));

        var token = command.CancellationTokenSource.Token;
        var jobResult = await _job.DoAsync(command.JobInput, token);

        if(token.IsCancellationRequested)
        {
            if(!command.IsCreateCommand)
                command.DoJobCommandSender.Tell(new JobDoneCommandResult(false,  
                    "Job was cancelled.",  
                    command.JobId));
            return;
        }

        if(!command.IsCreateCommand)
            command.DoJobCommandSender.Tell(new JobDoneCommandResult(jobResult, "Ok", command.JobId));

        _self.Tell(PoisonPill.Instance);
    }

Возможность отмены задач

Предусмотрена возможность отмены задачи. ManagerActor в момент получения команды StopJobCommand на остановку, вызывает отмену своего _cancellationTokenSource.Cancel(), кансел токен которого был прокинут в дочерний WorkerActor, и отправку ему смертельной пилюли _workerSupervisorActor.Tell(PoisonPill.Instance).

   private void StopJobCommandHandler(StopJobCommand _)
    {
        if (!_cancellationTokenSource.IsCancellationRequested)
        {
            _cancellationTokenSource.Cancel();
            _workerSupervisorActor.Tell(PoisonPill.Instance);
            Sender.Tell(new StopJobCommandResult(true, "Ok"));
            return;
        }
        Sender.Tell(new StopJobCommandResult(false, "Cancellation Requested Already."));
    }

DI - Microsoft.Extensions.DependencyInjection

В WorkerActor инъектирован интерфейс IServiceProvider serviceProvider, что позволяет управлять временем жизни скоупа _scope от момента инициализации воркера до момента остановки актора и очистки ресурсов. Для джоба пользователя скоуп - это все, что находится внутри метода IJob.DoAsync. Вызов _scope.Dispose() также вызовет методы Dispose, если в имплементации IJob будет также унаследован IDispoosable интерфейс.

    protected override void PreStart()
    {
        _job = _scope.ServiceProvider.GetService<IJob<TIn, TOut>>();
    }
    protected override void PostStop()
    {
        _scope.Dispose();
    }

Запросы

JobContext позволяет запросить состояние акторов, которые были созданы для определенного типа IJob.


    public async Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId,
        TimeSpan? timeout = null)
    {
        var currentTimeout = timeout ?? _defaultTimeout;
        var query = new RequestAllWorkersInfo(requestId, GetGroupName(), currentTimeout);
        RespondAllWorkersInfo<TOut> info = await _masterActor
            .Ask<RespondAllWorkersInfo<TOut>>(query, currentTimeout);
        return info.WorkersData;
    }

Использование

Для регистрации необходимо при старте проекта вызвать методы

//Job library registration
builder.Services.AddScoped<IJob<ForEachJobInput, ForEachJobResult>, ForEachJob>();
builder.Services.AddJobContext();

Где ForEachJob — это простая тестовая задача, которая перебирает значения от 0 до Count c ожидаеним секунды между шагами. В данном тестовом джобе ForEachJob отдельно унаследован интерфейс IDisposable, что позволяет дополнительно определить механизм очистки ресурсов в момент остановки задачи или после её штатного завершения работы.

public class ForEachJob : IJob<ForEachJobInput, ForEachJobResult>, IDisposable
{
    private int _currentState;

    private readonly ILogger<ForEachJob> _logger;
    //Здесь возможна инъекция любого зарегестрированного в приложении интерфейса 
    public ForEachJob(ILogger<ForEachJob> logger)
    {
        _logger = logger;
    }

    public async Task<bool> DoAsync(ForEachJobInput input, CancellationToken token)
    {
        foreach (var item in Enumerable.Range(0, input.Count))
        {
            if (token.IsCancellationRequested)
                return false;

            _currentState = item;
            _logger.LogInformation(item.ToString());
            await Task.Delay(1000, token);
        }

        return true;
    }

    public ForEachJobResult GetCurrentState(Guid jobId)
    {
        return new ForEachJobResult
        {
            Id = jobId,
            Data = _currentState
        };
    }

    public void Dispose()
    {
        _logger.LogInformation("Dispose.");
    }

}

Вызов, например, из контроллера будет выглядить так:

public class ForEachJobController : ControllerBase
{
    private readonly IJobContext<ForEachJobInput, ForEachJobResult> _jobContext;
    
    public ForEachJobController(
      IJobContext<ForEachJobInput, ForEachJobResult> jobContext)
    {
        _jobContext = jobContext;
    }
    
    [HttpPost]
    [Route(nameof(CreateJob))]
    public async Task<JobCreatedCommandResult> CreateJob([FromBody] ForEachJobInput input)
    {
        return await _jobContext.CreateJobAsync(input);
    }
    
    [HttpPost]
    [Route(nameof(DoJob))]
    public async Task<JobDoneCommandResult> DoJob([FromBody] ForEachJobInput input)
    {
        return await _jobContext.DoJobAsync(input);
    }
    
    [HttpPost]
    [Route(nameof(StopJob))]
    public async Task<StopJobCommandResult> StopJob([FromBody] Guid jobId)
    {
        return await _jobContext.StopJobAsync(jobId);
    }
    
    [HttpGet]
    [Route(nameof(GetAllJobs))]
    public async Task<ICollection<ForEachJobResult?>> GetAllJobs(
      [FromQuery] int requestId)
    {
        var result = await _jobContext
            .GetAllJobsCurrentStatesAsync(requestId);
        return result.Values.Select(x => x.Result).ToList();
    }
}

Итог

Akka один из возможных способов запуска отложенных задач с потенциальной возможностью кластеризации решения.

Комментарии (10)


  1. dyadyaSerezha
    10.06.2023 02:34
    +2

    1) в мастеровом DoJobCommandHandler:

    groupActor.Forward(doJobCommand); _groupIdToActor.Add(doJobCommand.GroupName, groupActor); _actorToGroupId.Add(groupActor, doJobCommand.GroupName);

    Почему сначала вызывается метод Forward, а потом актор добавляется в словари? Методологически надо наоборот, а вызывать Forward вообще один раз для обеих ветвей (новый актор или старый).

    2) в Итоге впервые, без объяснений встречается слово akka. Надо в начале хотя бы в одном предложении заметить, что показана работа библиотеки akka.

    3) хорошо бы добавить обратные уведомления о прогрессе (проценте) выполненной задачи.


    1. Dglkdpodg Автор
      10.06.2023 02:34

      @dyadyaSerezha, 3 ий пункт выглядит, как очень даже интересная функциональность


      1. dyadyaSerezha
        10.06.2023 02:34
        +2

        А кстати, почему во всех словарях - groupId, а само свойство - groupName? Вот так, на мелочах, шпионы и прокалываются...)


  1. MonkAlex
    10.06.2023 02:34
    +5

    Постановка задачи непонятна.

    Чего хотели добиться то?

    Чтобы просто запустить кучу задач и знать статус их выполнения, достаточно сделать нужное количество Task.Run(...) и смотреть, не completed ли каждая нужная из них. Положите в структуру\класс, для того чтобы поименовать или как-то ещё отметить.


    1. Dglkdpodg Автор
      10.06.2023 02:34

      @MonkAlex, добрый день, задача состоит в создании интерфейса IJobContext, имплементация которого для метода DoAsync абстрагирует работу со скоупом Di, определит стратегию перезапуска задачи, в нужное время (StopAsync) самостоятельно отменит CancellationTokenSource и даст возможность получить нетолько результат работы задачи, но и в момент ее выполнения даст возможность обращаться к ее текущему состоянию. Голый таск ран вполне может быть использован для некоторых отдельных функций из списка выше, но для всех разом - это уже будет какая-то другая имплементация требуемого списка функциональности из IJobContext не акке, а на таск ране


      1. MonkAlex
        10.06.2023 02:34
        +3

        Не всё понял в вашем ответе, но поясню, к чему был мой вопрос. Когда вы пишите статью, очень неплохо для начала объяснить, какие задачи решаете. Т.е. пояснить, что вот есть задача А, со своими особенностями. И вот такой инструмент как Б, позволяет написав вот такой код, получить нужный результат.

        А вашу статю я читаю как "вот есть такой код, который позволяет получить что-то". В конце упомянута акка, что добавило мне понимания. Но всё ещё неясно, какую изначально задачу решаем.

        Отдельно отмечу, что акка - это обычно что-то на тему concurrent & distributed, что в статье не вижу, может пропустил.


    1. nronnie
      10.06.2023 02:34

      Чего хотели добиться то?

      Я так понял, что тут хотели показать примеры работы с каким-то фреймворком для подобных задач, но, как-то явно про это не написали, только ссылка в конце статьи.


  1. RouR
    10.06.2023 02:34
    +7

    Общая схема запросов будет выглядеть как MasterActor→ GroupActor → ManagerActor → WorkerActor

    Это оверинжиниринг, на мой взгляд. Ещё и акторы зачем-то.

    В .net есть BackgroundService и всё сводится к 3м сервисам:

    1. Общая очередь (можно в памяти, синглтон или статик ConcurrentQueue) с данными для таски

    2. BackgroundService который смотрит эту очередь и запускает таску

    3. Сервис с бизнес-логикой самой таски. Паттерн стратегия.


    1. Dglkdpodg Автор
      10.06.2023 02:34

      @RouR , согласен, если просто фономо нужно что-то запустить, то BackgroundService - отличное решение


      1. mvv-rus
        10.06.2023 02:34

        Чтобы просто фоном запустить что-нибудь достаточно Task.Run, чтобы передать команду на завершение - передать в исполняемый через делегат метод CancellationToken а в коде метода - смотреть, что он не отменен, чтобы отслеживать custom-состояние - сделать в классе volatile поле (с доступом снаружи к нему напрямую или через свойство), имеющее тип, обновления которого атомарны (т.е., грубо говоря - не struct) и писать туда из метода состояние по мере выполнения - и так далее, по мере усложнения задачи.

        Так что присоединяюсь к мнению@MonkAlex: код из вашей статьи решает не приведенную в начале текста простую задачу, а гораздо более сложную.