Привет, Хабр! При создании фоновых работ, например, через Hangfire, может быть актуально учитывать разделяемые ресурсы (например, базы данных, внешнюю API или файловую систему). Поскольку такие ресурсы являются ограниченными, возникает потребность управления количеством параллельно исполняемых задач без написания сложной логики. Интересующимся особенностями распределения ресурсов в Hangfire при помощи очередей — пожаловать под кат :)

Часто есть задачи, которые обращаются к одному и тому же ресурсу (например, к базе данных), и известно, что ресурс поддерживает не более N параллельных операций, например,

  • база данных, которая способна обслужить максимум 2 тяжёлых запроса одновременно;

  • ограниченные лицензии на внешние API (например, только 5 параллельных вызовов);

  • работа с файловыми хранилищами или сетевыми дисками.

Вместо ручного контроля параллелизма (например, для C# это может быть SemaphoreSlim, блокировки и т. д.), можно использовать очереди Hangfire для распределения задач.

Текущая версия Hangfire (даже версия Hangfire GitHub, не Hangfire Pro) позволяет балансировать нагрузку через очереди Hangfire при наличии разделяемых ресурсов, что позволяет эффективно использовать возможности Hangfire и не писать новый код.

В Hangfire использование очередей в сочетании с созданием нескольких Hangfire серверов позволяет, например:

  • изолировать типы задач друг от друга;

  • сконфигурировать выполнение задач в рамках очередей;

  • ограничить количество параллельных задач в пределах одной очереди через настройку WorkerCount.

При необходимости для каждой очереди настроить свои параметры разделения ресурсов, необходимо создать несколько серверов Hangfire, например, через AddHangfireServer.

Допустим, есть кластер из 3 ClickHouse нод, и причем первый ClickHouse ch-node-1 выдерживает максимум 2 параллельных запроса, второй ch-node-2 — максимум 1 параллельный запрос, а третий ClickHouse ch-node-3 — максимум 3 запроса.

Фактически, каждый ClickHouse — это разделяемый ресурс, поэтому имеет смысл не имплементировать вручную логику разделения таких ресурсов, а использовать встроенные средства Hangfire. Стоит отметить, что это частный кейс работы с Hangfire, и рассматриваются базовые возможности Hangfire, не дополнительные возможности.

Создадим в Hangfire 3 очереди — ch-node-1, ch-node-2,ch-node-3, и на каждую добавляем по 2 воркера. Также создадим 3 сервера Hangfire — по одному серверу на каждый ClickHouse, чтобы указать параметры паралельной обработки запросов.

app.AddHangfireServer(new BackgroundJobServerOptions
{
    Queues = [ "ch-node-1" ],
    WorkerCount = 2
});
app.AddHangfireServer(new BackgroundJobServerOptions
{
    Queues = [ "ch-node-2" ],
    WorkerCount = 1
});
app.AddHangfireServer(new BackgroundJobServerOptions
{
    Queues = [ "ch-node-3" ],
    WorkerCount = 3
});

Это можно представить в виде следующей схемы.

Видно, что максимум 6 воркеров, причем условия по максимальному количеству воркеров для каждого ClickHouse выполняются (ch-node-1 — максимум 2 воркера, ch-node-2 — максимум 1 воркер, ch-node-3 — максимум 3 воркера), т.е. корректно разделяются ресурсы.

Для добавления работы в очередь достаточно указать имя очереди в Enqueue:

BackgroundJob.Enqueue<Services>("ch-node-1", x => x.DoWork());

Таким образом, нагрузка на каждый ресурс контролируется через очередь, и используются несколько Hangfire серверов для задания различных параметров и воркеров для каждой очереди.

Этот функционал можно имплементировать и вручную на C#, например, через редомендованный Microsoft семафор SemaphoreSlim для ограничения числа задач, очереди на основе BlockingCollection или TaskScheduler с кастомным пулом. Но это достаточно трудозатратно, сложнее поддерживать, не масштабируется по нескольким приложениям/нодам, а Hangfire решает такого вида вопросы из коробки.

Рассмотрим в качестве иллюстрации пример реализации через SemaphoreSlim. Для решения задачи, в которой требуется ограничить количество одновременных вызовов метода Services.DoWork() с использованием SemaphoreSlim, можно реализовать три отдельных семафора для трех ClickHouse (ch-node-1, ch-node-2, ch-node-3). Каждый семафор будет отвечать за управление числом разрешенных ресурсов для соответствующего ClickHouse.

static class Program
{
    private static readonly SemaphoreSlim SemaphoreClickHouseNode1 = new(2); // Для CH1 разрешены 2 ресурса
    private static readonly SemaphoreSlim SemaphoreClickHouseNode2 = new(1); // Для CH2 разрешен 1 ресурс
    private static readonly SemaphoreSlim SemaphoreClickHouseNode3 = new(3); // Для CH3 разрешены 3 ресурса

    public static async Task Main(string[] args)
    {
        // Пример запуска работы для каждого ClickHouse
        var tasks = new[]
        {
            RunClickHouseWork("ch-node-1"),
            RunClickHouseWork("ch-node-2"),
            RunClickHouseWork("ch-node-3")
        };

        await Task.WhenAll(tasks);
    }

    static async Task RunClickHouseWork(string clickHouseNodeName)
    {
        SemaphoreSlim semaphore = null;

        switch (clickHouseNodeName)
        {
            case "ch-node-1":
                semaphore = SemaphoreClickHouseNode1;
                break;
            case "ch-node-2":
                semaphore = SemaphoreClickHouseNode2;
                break;
            case "ch-node-3":
                semaphore = SemaphoreClickHouseNode3;
                break;
            default:
                throw new ArgumentException("Invalid ClickHouse node");
        }

        for (int i = 0; i < 5; i++) // Пример 5 операций на один ClickHouse 
        {
            await semaphore.WaitAsync();
            
            try
            {
                Console.WriteLine($"Starting work on {clickHouseNodeName} - Task {i}");
                await Services.DoWork(clickHouseNodeName, i); // Пример вызова работы
                Console.WriteLine($"Completed work on {clickHouseNodeName} - Task {i}");
            }
            finally
            {
                semaphore.Release();
            }
        }
    }
}

public static class Services
{
    // Пример асинхронной работы
    public static async Task DoWork(string clickHouseNodeName, int taskNumber)
    {
        // Эмуляция асинхронной работы
        await Task.Delay(1000);
    }
}

В этой реализации создаются три семафора SemaphoreClickHouseNode1, SemaphoreClickHouseNode2, и SemaphoreClickHouseNode3, каждый из которых контролирует доступ к ресурсу для соответствующих ClickHouse. Функция RunClickHouseWork запускает симуляцию работы по ClickHouse, соблюдая ограничения семафоров. Services.DoWork эмулирует заданную работу, здесь был использован Task.Delay в качестве примера асинхронной задачи. Также важно не забывать оборачивать операции с семафором в блок try-finally, чтобы обеспечить корректный возврат ресурсов семафору даже в случае возникновения исключений.

Как видно, ручная реализация на C# без использования стандартной логики Hangfire может быть относительно трудоемкой.

Однако нужно иметь в виду ограничения бесплатной версии Hangfire по сравнению с Hangfire Pro, например, батчи и продолжения (Batches & Continuations).

Таким образом, Hangfire отлично подходит для задач, где важно контролировать нагрузку на ресурсы. Использование нескольких серверов и очередей в Hangfire позволяет управлять параллелизмом и распределять задачи по ресурсам, без необходимости написания кода. Успехов в организации очередей Hangfire :)

Комментарии (12)


  1. ValeriyPus
    20.07.2025 05:44

    Беда в том, что этот Hangfire не нужен.

    Маленькое Singlethread приложение - семафоры.

    Большие приложения - Akka. (https://getakka.net/articles/actors/dispatchers.html)

    И все бесплатно :)

    Просто появляется еще одна непонятная конфигурация (приложение, деплой, конфигурация контейнера, конфигурация Джобов(!))


    1. granit1986
      20.07.2025 05:44

      Почему Akka, а не родной Orleans?


      1. ValeriyPus
        20.07.2025 05:44

        Слишком страшная документация

        Получив сообщение подтверждения о запуске Silo , запустите клиент

        silo:

        1. an underground chamber in which a guided missile is kept ready for firing.

        А вообще - а зачем мне состояние в акторах? (Grains)

        А сохранять состояние акторов?

        А я не устану это отлаживать?

        upd:

        нашел статью по DDD+Orleans, сейчас почитаю

        https://habr.com/ru/articles/535452/


        1. granit1986
          20.07.2025 05:44

          Ну да, вроде как сервер надо запускать первым, а потом клиент. При этом потом сервер можно перезапускать и делать всё что угодно.
          Про состояния не понял - если не нужно состояние, то и не сохраняйте.
          С отладкой тоже не понятно в чём проблема.


          1. ValeriyPus
            20.07.2025 05:44

            Мороки много, для документооборота

            1) Как искать документы (акторы/Grain, Rich Domain Model) не только по ID (как Id матчей и т.д)? Через другой актор?

            У меня мой ConcurrentDictionary потянет 10к RPS, а потянет ли сеть до Silo? :)

            2) Могу ли я добавить\изменить поле

            3) Как делать транзакции

            4) Что с дедлоками

            5) Куда и как это сохраняется

            6) И т.д

            И надо книгу читать )

            Вторая проблема не так очевидна — это так называемая цепная реакция. Когда пользователь поднимает какой-то грейн, а тот в свою очередь может неявно поднять другие грейны в системе. Как это происходит: пользователь получает свои состояния, а у пользователя есть друзья и он получает состояния своих друзей. Таким образом, вся система держит все свои грейны в памяти и если у нас 1000 пользователей, и у каждого 100 друзей, то 100 000 грейнов могут быть активны просто так. Такого случая тоже нужно избегать — как-то хранить стейты друзей в какой-то общей памяти.

            В этом плане просто шины и кучи stateless обработчиков данных из шин гораздо проще в плане разработки\отладки\поддержки\масштабируемости.

            Мы завалили проект и у нас развалился весь флоу и пошли копипасты логики. Но вы хотите взять какой-то Orleans! Кто это все делать будет?
            Мы завалили проект и у нас развалился весь флоу и пошли копипасты логики. Но вы хотите взять какой-то Orleans! Кто это все делать будет?


            1. granit1986
              20.07.2025 05:44

              Так вам, судя по всему, здесь и Акка не поможет никак.

              Вряд ли кто будет это в вакансии писать. Обычно пишут общие требования.

              У меня вот есть в проде 2 сервиса с Orleans и они отлично справляются с тем что надо и я не парюсь с синхронизацией доступа и распределением нагрузки и данных.

              На самом деле он довольно простой и там не нужно каких-то специфических знаний, достаточно прочитать доки, благо они нормально написаны.


              1. ValeriyPus
                20.07.2025 05:44

                В акка состояние акторов не сохраняется (актор обрабатывает сообщения, отсылает сообщения). Т.е. в akka актор - прямо функция из ФП.

                В Orleans - Grain это богатый обьект (есть Grain без состояния, почти как актор в Akka).

                В Orleans нет индексирования (кроме полуофициального репозитория без Nuget пакета).

                Т.е. работать с обьектами можно очень быстро (они в оперативной памяти), а так же сразу из нескольких мест, так же это все можно масштабировать.

                Найти нужный обьект, кроме как по Id - нельзя :)

                Поэтому просто взять и использовать Orleans в СЭД - нельзя, придется городить кучу костылей.


                1. granit1986
                  20.07.2025 05:44

                  Ну так и в грейнах состояние не обязательно.
                  Да, индексирования нет, но ничего не мешает внутри грейна использовать, например, данные из базы. Так же ничего не мешает получить нужные Id из хранилища по условиям и потом уже использовать грейны.
                  Я делаю Select с нужными условиями в БД и вот уже есть нужные Id.

                  У меня есть пара грейнов с встроенным сохранением состояния т.к. знаю, что там не нужен поиск.

                  В Orleans - Grain это богатый обьект (есть Grain без состояния, почти как актор в Akka).

                  Нет - это просто объект с ключом и методами. Всё зависит от вашей реализации. Кстати, при получении грейна - ничего не происходит, вы просто получаете ссылку. Потом при обращении к нему да, если его нет, то он активируется, если уже есть, то просто вызывается нужный метод.


                  1. ValeriyPus
                    20.07.2025 05:44

                    Да, индексирования нет, но ничего не мешает внутри грейна использовать, например, данные из базы

                    Тогда получается дублирование функций хранилища грейнов. И основной плюс (грейны могут обрабатывать БОЛЬШЕ событий из разных источников, чем чтение\запись в базу) исчезает.

                    Так же исчезает второй плюс - не нужно ручками ничего писать (никакой инфраструктуры, с сохранением индексов и проч).

                    И чем это лучше, чем просто кучи акторов, связанные персистентными шинами? Ничем.

                    Для Игрового Лобби, Miro, сервиса сокращения URL - да, отлично подходит.

                    Для ЭДО - абсолютно не подходит.


  1. koanse Автор
    20.07.2025 05:44

    Наверно единого оптимального решения нет, если Hangfire уже используется и нет планов с него уходить, то можно и его более глубоко использовать.

    Akka .NET интересен, но в нем нужно переходить на акторы, и он подходит для всего приложения. Выглядит так, что его удобно использовать для новых проектов, но для существующих требует время на переход.


  1. impwx
    20.07.2025 05:44

    Какая криповая КДПВ. Персонаж за тобой следит


    1. koanse Автор
      20.07.2025 05:44

      Как вариант — следит за очередью, к кофе-машине