Написание этой статьи навеяно статьёй "Task изнутри: управление потоками в .NET и создание своих планировщиков". Одна из проблем озвученных в данной статье это ограничение числа одновременно выполняемых задач.

Немного подробнее о задаче.

В современном .NET для выполнения задачи в фоне, (запуск операций на потоке отличном от текущего) достаточно вызвать Task.Run(). Данный подход очень удобен в использовании, но в фон может быть отправлено неограниченное количество операций, при этом все операции начнут выполнение незамедлительно.
Бывают случаи, когда нужно контролировать потребление ресурсов сервера. Самый простой пример — это ограничения на оперативную память, то есть приложение не должно превысить заданный порог потребления. В таком случае нужно каким-то образом ограничить одновременное выполнение задач в фоне.

В указанной выше статье для таких целей использовался кастомный планировщик. Полагаю многим пришла в голову мысль о возможности создании решения на основе SemaphoreSlim, я же решил его воплотить и проверить насколько проще такое решение, чем кастомный планировщик.

Первое решение задачи.

В первую очередь нацеленно на быстроту воплощения и малый объём кода. Оно не обладает гибкостью, к примеру нет контроля над очередью выполнения задач, отправленных в фон.

public sealed class BackgroundProcessor : IDisposable
{
    private readonly ILogger<BackgroundProcessor> _logger;
    private readonly SemaphoreSlim _semaphoreSlim;

    public BackgroundProcessor(ILogger<BackgroundProcessor> logger, IOptions<BackgroundProcessingCfg> options)
    {
        _logger = logger;
        _semaphoreSlim = new SemaphoreSlim(options.Value.Parallelism, options.Value.Parallelism);
    }

    public void Proceed(Func<Task> task, CancellationToken cancellationToken) =>
        Task.Run(async () =>
        {
            try
            {
                await _semaphoreSlim.WaitAsync(cancellationToken);
                await task.Invoke();
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, "_");
            }
            finally
            {
                _semaphoreSlim.Release();
            }
        }, cancellationToken);

    public void Proceed(Action action, CancellationToken cancellationToken) =>
        Task.Run(async () =>
        {
            try
            {
                await _semaphoreSlim.WaitAsync(cancellationToken);
                action.Invoke();
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, "_");
            }
            finally
            {
                _semaphoreSlim.Release();
            }
        }, cancellationToken);

    public void Dispose() => _semaphoreSlim.Dispose();
}

Вся логика кроется в классе BackgroundProcessor. Ограничителем количества одновременно выполняемых задач является SemaphoreSlim, его параметры инициализации можно задать через конфигурацию при старте приложения.

await _semaphoreSlim.WaitAsync(cancellationToken);
await task.Invoke();

Задача, отправленная в фон, начнёт выполняться только тогда, когда в семафоре появится «свободное место». Количество «свободных мест» можно посмотреть через свойство CurrentCount и при необходимости вывести наружу.

Второе решение.

Оно чуть сложнее и имеет другой характер использования.

public sealed class BackgroundProcessorQueue : IDisposable
{
    private readonly ILogger<BackgroundProcessorQueue> _logger;
    private readonly SemaphoreSlim _semaphoreSlimQueue;

    public BackgroundProcessorQueue(ILogger<BackgroundProcessorQueue> logger, IOptions<BackgroundProcessingCfg> options)
    {
        _logger = logger;
        _semaphoreSlimQueue = new SemaphoreSlim(options.Value.ConcurrentQueue, options.Value.ConcurrentQueue);
    }

    public async Task AwaitProceed(Func<Task> task, CancellationToken cancellationToken)
    {
        await _semaphoreSlimQueue.WaitAsync(cancellationToken);

        _ = Task.Run(async () =>
        {
            try
            {
                await task.Invoke();
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, "_");
            }
            finally
            {
                _semaphoreSlimQueue.Release();
            }
        }, cancellationToken);
    }
    
    public async Task AwaitProceed(Action task, CancellationToken cancellationToken)
    {
        await _semaphoreSlimQueue.WaitAsync(cancellationToken);

        _ = Task.Run(() =>
        {
            try
            {
                task.Invoke();
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, "_");
            }
            finally
            {
                _semaphoreSlimQueue.Release();
            }
        }, cancellationToken);
    }

    public int AvailableSpace() => _semaphoreSlimQueue.CurrentCount;
    
    public void Dispose() => _semaphoreSlimQueue.Dispose();
}

Данный обработчик позволяет отправить в фон единоразово только количество задач равное «свободному месту» в семафоре, в случае его отсутствия вызывающий метод доложен дождаться освобождения семафора.

public async Task AwaitProceed(Func<Task> task, CancellationToken cancellationToken)
{
    await _semaphoreSlimQueue.WaitAsync(cancellationToken);

    _ = Task.Run(async () =>
    ***

Такой подход достигается другим расположением блокировки семафора - сразу после входа в метод выполняется ожидание семафора, который так же как и до этого ограничивает параллелизм.
При этом есть нюансы использования, к примеру несколько потребителей этого метода будут конкурировать за взятие блокировки (если такой термин уместен) у семафора, то есть один из потребителей может ожидать своей очереди дольше остальных.
Код проекта доступен на GitHub.

Что вы думаете о таком подходе к ограничению параллелизма выполнения фоновых задач?

Комментарии (1)


  1. Sicarius
    09.12.2024 04:43

    Возвращаясь к теме предыдущей статьи: думаю что в проде стоит использовать готовое решение