Привет, Хабр!

В многопоточном программирование постоянно нужно решать задачи, связанные с доступом к данным из нескольких потоков одновременно. И тут очень кстати ConcurrentBag — коллекция, которая была добавлена в .NET Framework 4.0 специально для таких случаев. Она подходит для ситуаций, где порядок элементов не важен. Если нужно просто кидать данные в мешок, пока куча потоков их туда же добавляет.

Поэтому ConcurrentBag создана для сценариев, где один и тот же поток часто и добавляет, и извлекает данные. Поэтому эта коллекция идеальна для паттернов типа Producer-Consumer, где один поток наполняет коллекцию, а другой извлекает данные для дальнейшей обработки. Но самое классное, что работает она по принципу work-stealing.

Как работает ConcurrentaBag

Thread-local storage: локальные очереди для каждого потока

В основе ConcurrentBag лежит интересный механизм — локальные очереди для каждого потока. То есть каждый поток, который добавляет данные в коллекцию, имеет свою собственную локальную очередь. Эти локальные очереди не изолированы полностью, их можно использовать для передачи данных между потоками, но основная идея заключается в том, что каждый поток работает со своей локальной копией данных.

Пример:

ConcurrentBag<int> bag = new ConcurrentBag<int>();

Task.Factory.StartNew(() =>
{
    for (int i = 0; i < 5; i++)
    {
        bag.Add(i);
        Console.WriteLine($"Thread {Task.CurrentId} added {i}");
    }
});

Task.Factory.StartNew(() =>
{
    foreach (var item in bag)
    {
        Console.WriteLine($"Thread {Task.CurrentId} accessed {item}");
    }
});

Task.WaitAll();  // Ожидаем завершения задач

Здесь каждый поток добавляет элементы в локальную очередь и может получить доступ к данным. Однако, когда элементы добавляются одним потоком, а извлекаются другим, ConcurrentBag использует алгоритм work-stealing.

work-stealing

Алгоритм work-stealing делает эту коллекцию уникальной в плане многопоточной производительности. Суть в том, что каждый поток добавляет элементы в свою очередь, но если один поток заканчивает выполнение и его локальная очередь пустеет, он может украсть данные из очереди другого потока.

Представим, естьь пул потоков, и один поток только что освободился, а в других потоках еще есть незавершенные задачи. Вместо того чтобы простаивать, свободный поток просто крадет данные из другого потока, забирая элементы из конца его локальной очереди. Весь процесс происходит в фоновом режиме, и не нужно ничего дополнительно синхронизировать.

Пример:

ConcurrentBag<int> bag = new ConcurrentBag<int>();

Task.Factory.StartNew(() =>
{
    for (int i = 0; i < 10; i++)
    {
        bag.Add(i);
    }
});

Task.Factory.StartNew(() =>
{
    int result;
    while (bag.TryTake(out result))
    {
        Console.WriteLine($"Thread {Task.CurrentId} took {result}");
    }
});

Task.WaitAll();

Так второй поток ворует данные из коллекции, добавленные первым потоком. Здесь видим работу метода TryTake, который забирает элементы из коллекции, давая приоритет локальной очереди текущего потока. Если в ней нет элементов, поток переходит к поиску элементов в других потоках, выполняя "воровство" данных.

Когда возникают проблемы

Но не все так радужно, как может показаться на первый взгляд. Поскольку ConcurrentBag основан на локальных очередях и stealing-алгоритме, могут возникать ситуации гонок и конфликты. Например, когда один поток пытается украсть элементы у другого, могут возникать моменты, когда оба потока пытаются получить доступ к одному и тому же элементу. Для этого ConcurrentBag использует различные механизмы синхронизации, чтобы минимизировать конфликты.

Когда поток работает с тремя или более элементами в своей локальной очереди, никакая блокировка не требуется. Однако если в очереди меньше трех элементов, включается механизм блокировок для предотвращения гонок при доступе к одним и тем же данным другими потоками.

Когда нужно выполнить глобальные операции, например, ToArray или Count, ConcurrentBag вводит глобальную блокировку на уровне коллекции, замораживая структуру и предотвращая её изменения во время выполнения этих операций.

Когда поток пытается украсть данные, он использует SpinWait — небольшой цикл ожидания, пока другой поток завершит свои операции над локальной очередью, если есть риск конфликта.

Операции над данными в ConcurrentBag

Операция Add

Операция Add — это основа всего.

Пример:

ConcurrentBag<int> bag = new ConcurrentBag<int>();

Parallel.For(0, 10, (i) =>
{
    bag.Add(i);
    Console.WriteLine($"Thread {Task.CurrentId} added {i}");
});

Здесь каждый поток просто добавляет элементы в свой мешок, и никакой блокировки нет. Это и есть фича ConcurrentBag — локальные очереди уменьшают количество синхронизаций.

TryTake

Операция TryTake — это попытка забрать элемент из коллекции. Если поток пытается забрать элемент, он сначала пытается сделать это из своей локальной очереди. Если там ничего нет, включается тот самый механизм work-stealing, о котором мы уже говорили. Поток начинает красть элементы из других потоков, забирая данные из их локальных очередей.

Пример:

ConcurrentBag<int> bag = new ConcurrentBag<int>();

Parallel.For(0, 10, (i) => bag.Add(i));

int result;
if (bag.TryTake(out result))
{
    Console.WriteLine($"Thread {Task.CurrentId} took {result}");
}

Итак, если у текущего потока есть элементы в его локальной очереди, они будут взяты именно оттуда. Если же нет, поток начнёт проверять, можно ли украсть элементы у других потоков.

TryPeek

TryPeek похож на TryTake, но с отличием: он не удаляет элемент из коллекции. Этот метод хорош, когда нужно просто заглянуть в коллекцию, не вмешиваясь в её структуру.

Пример:

int peekedItem;
if (bag.TryPeek(out peekedItem))
{
    Console.WriteLine($"Thread {Task.CurrentId} peeked and saw {peekedItem}");
}

Стоит отметить, что в порядке Peek тоже нет гарантии, что элементы будут возвращаться в том порядке, в котором они были добавлены.

Методы ToArray и Count: зачем их избегать

Методы ToArray и Count в ConcurrentBag запускают глобальные блокировки на уровне коллекции. Их в основном юзают для того, чтобы сделать некую консистентность данных, но такие операции могут значительно замедлить работу программы.

Когда вы вызываете ToArray, коллекция должна быть заморожена, чтобы гарантировать, что данные не изменятся, пока они копируются в массив. То же самое касается Count — чтобы точно подсчитать количество элементов, нужно заблокировать все потоки, чтобы не допустить изменения коллекции во время вычисления.

Пример:

ConcurrentBag<int> bag = new ConcurrentBag<int>();

Parallel.For(0, 10, (i) => bag.Add(i));

int[] items = bag.ToArray();
Console.WriteLine($"Items in bag: {string.Join(", ", items)}");

Метод ToArray может быть полезен для финального сбора данных, но его лучше избегать.

Примеры применения

Многопоточное кэширование объектов

Допустим, есть высоконагруженное веб-приложение, которое постоянно создает и уничтожает объекты — будь то соединения с базой данных, файловые дескрипторы или просто объекты в памяти. Создание объектов с нуля — это всегда дорого по производительности, поэтому создаём пул объектов, которые могут быть повторно использованы.

class ConnectionPool
{
    private ConcurrentBag<DatabaseConnection> _connections = new ConcurrentBag<DatabaseConnection>();

    public DatabaseConnection GetConnection()
    {
        if (_connections.TryTake(out var connection))
        {
            Console.WriteLine($"Reusing connection {connection.Id}");
            return connection;
        }

        // Если нет доступных соединений, создаем новое
        var newConnection = new DatabaseConnection();
        Console.WriteLine($"Creating new connection {newConnection.Id}");
        return newConnection;
    }

    public void ReturnConnection(DatabaseConnection connection)
    {
        _connections.Add(connection);
        Console.WriteLine($"Returned connection {connection.Id} back to the pool");
    }
}

В этом коде ConcurrentBag используется как пул соединений. Когда поток запрашивает соединение, он пытается забрать его из мешка. Если соединений нет, создаётся новое. Когда работа завершена, соединение возвращается в пул для повторного использования другими потоками.

Асинхронное логирование в многопоточной среде

Можно использовать ConcurrentBag для хранения логов и асинхронного их сбора. Допустим, есть приложение, которое пишет логи с разных потоков, и потом они собираются и записываются в файл или базу данных.

Пример:

class Logger
{
    private ConcurrentBag<string> _logMessages = new ConcurrentBag<string>();

    public void LogMessage(string message)
    {
        _logMessages.Add($"{DateTime.Now}: {message}");
    }

    public void WriteLogsToFile()
    {
        foreach (var message in _logMessages)
        {
            Console.WriteLine($"Writing log: {message}");
            // Пример записи в файл или базу данных
        }
    }
}

Здесь ConcurrentBag используется для хранения сообщений логов, которые добавляются асинхронно с разных потоков. Когда наступает время сбора логов (например, через каждые 10 минут), данные забираются из ConcurrentBag и записываются в файл или базу данных.

Паттерн Producer-Consumer

И вот мы подошли к, пожалуй, самому важному — Producer-Consumer. Это классический паттерн многопоточных приложений, где один или несколько потоков создают данные (Producers), а другие потоки их обрабатывают (Consumers). В этом случае ConcurrentBag — хороший кандидат для работы с такими задачами.

Пример:

class TaskProcessor
{
    private ConcurrentBag<Action> _tasks = new ConcurrentBag<Action>();

    public void ProduceTasks()
    {
        for (int i = 0; i < 10; i++)
        {
            var task = new Action(() => Console.WriteLine($"Processing task {i}"));
            _tasks.Add(task);
            Console.WriteLine($"Task {i} added");
        }
    }

    public void ConsumeTasks()
    {
        while (_tasks.TryTake(out var task))
        {
            task();
            Console.WriteLine("Task executed");
        }
    }
}

Здесь два метода: ProduceTasks, который добавляет задачи в ConcurrentBag, и ConsumeTasks, который забирает задачи и выполняет их.

Подробнее с ConcurrentBag можно ознакомиться здесь.


Всех, кому интересна тема работы с данными в C#, рекомендую посетить открытый урок «Linq на практике», который пройдет 22 октября.

После вебинара вы сможете писать свои linq‑запросы, опираясь на синтаксис linq, и сделать свою работу эффективней благодаря применению компараторов. Записаться на урок можно на странице курса «C# Developer».

Комментарии (8)


  1. Baton34
    11.10.2024 19:09

    Task.Factory.StartNew(() =&gt;

    Подумал какой-то новый синтаксис в шарпе, а это чудеса копипасты.

    Task.Factory.StartNew(() =>


  1. rotabor
    11.10.2024 19:09

    А это, очевидно, какая-то новая логика:

    Поэтому ConcurrentBag создана для сценариев, где один и тот же поток часто и добавляет, и извлекает данные. Поэтому эта коллекция идеальна для паттернов типа Producer-Consumer, где один поток наполняет коллекцию, а другой извлекает данные для дальнейшей обработки.

    То ли один поток, то ли не один, то ли много...


    1. nronnie
      11.10.2024 19:09

      То ли

      То ли AI писал.


  1. Varim
    11.10.2024 19:09

    если один поток заканчивает выполнение и его локальная очередь пустеет, он может украсть данные из очереди другого потока.

    И как поток ворует данные из другого потока?


  1. Mingun
    11.10.2024 19:09

    Асинхронное логирование в многопоточной среде

    Вообще-то, при логировании важен порядок поступающих сообщений, так что хранить их в коллекции, которая такой порядок не сохраняет, мягко говоря, бредовая идея.


    1. Kanut
      11.10.2024 19:09

      Зависит от того как вы логируете. Если вы пишите в базу данных и каждый лог имеет таймстэмп, то не особо принципиально в каком порядке вы их запишите в базу.


      1. onyxmaster
        11.10.2024 19:09

        Тут вопрос в том что происходит когда очередь переполняется, пусть даже временно. С FIFO понятную человеку политику пропуска/удаления сделать проще чем с контейнером который возвращает данные в, условно, произвольном порядке.

        А так да, вероятно у всех современных распределённых систем журналирования есть поддержка записи с временной меткой в прошлом, просто окно не слишком большое должно быть.


  1. onyxmaster
    11.10.2024 19:09

    Поэтому ConcurrentBag создана для сценариев, где один и тот же поток часто и добавляет, и извлекает данные. Поэтому эта коллекция идеальна для паттернов типа Producer-Consumer, где один поток наполняет коллекцию, а другой извлекает данные для дальнейшей обработки

    Странный вывод из верного посыла. ConcurrentBag медленнее чем ConcurrentQueue, если читают и пишут разные потоки, а традиционный producer-consumer обычно реализован именно так, какая-то группа потоков пишет, а какая-то — разгребает полученное. Смешивать эти группы обычно вредно и для понятности и для производительности. Конечно всегда есть async, где не узнать заранее на какой поток приземлится выполняемый код, но я бы постарался всё равно прибить или производителей или потребителей к одному и тому же набору потоков.

    Ну и ещё есть BlockingCollection, который решает многие другие проблемы из реального мира, типа ограничений на размер (которые в случае ConcurrentXXX легко не решаются ибо Count там очень медленный, а добавлять внешний относительно коллекции Interlocked.Increment по производительности может оказаться неприятно).