Привет, Хабр!
В многопоточном программирование постоянно нужно решать задачи, связанные с доступом к данным из нескольких потоков одновременно. И тут очень кстати ConcurrentBag — коллекция, которая была добавлена в .NET Framework 4.0 специально для таких случаев. Она подходит для ситуаций, где порядок элементов не важен. Если нужно просто кидать данные в мешок, пока куча потоков их туда же добавляет.
Поэтому ConcurrentBag создана для сценариев, где один и тот же поток часто и добавляет, и извлекает данные. Поэтому эта коллекция идеальна для паттернов типа Producer-Consumer, где один поток наполняет коллекцию, а другой извлекает данные для дальнейшей обработки. Но самое классное, что работает она по принципу work-stealing.
Как работает ConcurrentaBag
Thread-local storage: локальные очереди для каждого потока
В основе ConcurrentBag лежит интересный механизм — локальные очереди для каждого потока. То есть каждый поток, который добавляет данные в коллекцию, имеет свою собственную локальную очередь. Эти локальные очереди не изолированы полностью, их можно использовать для передачи данных между потоками, но основная идея заключается в том, что каждый поток работает со своей локальной копией данных.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>();
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 5; i++)
{
bag.Add(i);
Console.WriteLine($"Thread {Task.CurrentId} added {i}");
}
});
Task.Factory.StartNew(() =>
{
foreach (var item in bag)
{
Console.WriteLine($"Thread {Task.CurrentId} accessed {item}");
}
});
Task.WaitAll(); // Ожидаем завершения задач
Здесь каждый поток добавляет элементы в локальную очередь и может получить доступ к данным. Однако, когда элементы добавляются одним потоком, а извлекаются другим, ConcurrentBag использует алгоритм work-stealing.
work-stealing
Алгоритм work-stealing делает эту коллекцию уникальной в плане многопоточной производительности. Суть в том, что каждый поток добавляет элементы в свою очередь, но если один поток заканчивает выполнение и его локальная очередь пустеет, он может украсть данные из очереди другого потока.
Представим, естьь пул потоков, и один поток только что освободился, а в других потоках еще есть незавершенные задачи. Вместо того чтобы простаивать, свободный поток просто крадет данные из другого потока, забирая элементы из конца его локальной очереди. Весь процесс происходит в фоновом режиме, и не нужно ничего дополнительно синхронизировать.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>();
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
{
bag.Add(i);
}
});
Task.Factory.StartNew(() =>
{
int result;
while (bag.TryTake(out result))
{
Console.WriteLine($"Thread {Task.CurrentId} took {result}");
}
});
Task.WaitAll();
Так второй поток ворует данные из коллекции, добавленные первым потоком. Здесь видим работу метода TryTake
, который забирает элементы из коллекции, давая приоритет локальной очереди текущего потока. Если в ней нет элементов, поток переходит к поиску элементов в других потоках, выполняя "воровство" данных.
Когда возникают проблемы
Но не все так радужно, как может показаться на первый взгляд. Поскольку ConcurrentBag основан на локальных очередях и stealing-алгоритме, могут возникать ситуации гонок и конфликты. Например, когда один поток пытается украсть элементы у другого, могут возникать моменты, когда оба потока пытаются получить доступ к одному и тому же элементу. Для этого ConcurrentBag использует различные механизмы синхронизации, чтобы минимизировать конфликты.
Когда поток работает с тремя или более элементами в своей локальной очереди, никакая блокировка не требуется. Однако если в очереди меньше трех элементов, включается механизм блокировок для предотвращения гонок при доступе к одним и тем же данным другими потоками.
Когда нужно выполнить глобальные операции, например, ToArray
или Count
, ConcurrentBag вводит глобальную блокировку на уровне коллекции, замораживая структуру и предотвращая её изменения во время выполнения этих операций.
Когда поток пытается украсть данные, он использует SpinWait — небольшой цикл ожидания, пока другой поток завершит свои операции над локальной очередью, если есть риск конфликта.
Операции над данными в ConcurrentBag
Операция Add
Операция Add — это основа всего.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>();
Parallel.For(0, 10, (i) =>
{
bag.Add(i);
Console.WriteLine($"Thread {Task.CurrentId} added {i}");
});
Здесь каждый поток просто добавляет элементы в свой мешок, и никакой блокировки нет. Это и есть фича ConcurrentBag — локальные очереди уменьшают количество синхронизаций.
TryTake
Операция TryTake — это попытка забрать элемент из коллекции. Если поток пытается забрать элемент, он сначала пытается сделать это из своей локальной очереди. Если там ничего нет, включается тот самый механизм work-stealing, о котором мы уже говорили. Поток начинает красть элементы из других потоков, забирая данные из их локальных очередей.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>();
Parallel.For(0, 10, (i) => bag.Add(i));
int result;
if (bag.TryTake(out result))
{
Console.WriteLine($"Thread {Task.CurrentId} took {result}");
}
Итак, если у текущего потока есть элементы в его локальной очереди, они будут взяты именно оттуда. Если же нет, поток начнёт проверять, можно ли украсть элементы у других потоков.
TryPeek
TryPeek
похож на TryTake
, но с отличием: он не удаляет элемент из коллекции. Этот метод хорош, когда нужно просто заглянуть в коллекцию, не вмешиваясь в её структуру.
Пример:
int peekedItem;
if (bag.TryPeek(out peekedItem))
{
Console.WriteLine($"Thread {Task.CurrentId} peeked and saw {peekedItem}");
}
Стоит отметить, что в порядке Peek тоже нет гарантии, что элементы будут возвращаться в том порядке, в котором они были добавлены.
Методы ToArray и Count: зачем их избегать
Методы ToArray и Count в ConcurrentBag запускают глобальные блокировки на уровне коллекции. Их в основном юзают для того, чтобы сделать некую консистентность данных, но такие операции могут значительно замедлить работу программы.
Когда вы вызываете ToArray
, коллекция должна быть заморожена, чтобы гарантировать, что данные не изменятся, пока они копируются в массив. То же самое касается Count
— чтобы точно подсчитать количество элементов, нужно заблокировать все потоки, чтобы не допустить изменения коллекции во время вычисления.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>();
Parallel.For(0, 10, (i) => bag.Add(i));
int[] items = bag.ToArray();
Console.WriteLine($"Items in bag: {string.Join(", ", items)}");
Метод ToArray
может быть полезен для финального сбора данных, но его лучше избегать.
Примеры применения
Многопоточное кэширование объектов
Допустим, есть высоконагруженное веб-приложение, которое постоянно создает и уничтожает объекты — будь то соединения с базой данных, файловые дескрипторы или просто объекты в памяти. Создание объектов с нуля — это всегда дорого по производительности, поэтому создаём пул объектов, которые могут быть повторно использованы.
class ConnectionPool
{
private ConcurrentBag<DatabaseConnection> _connections = new ConcurrentBag<DatabaseConnection>();
public DatabaseConnection GetConnection()
{
if (_connections.TryTake(out var connection))
{
Console.WriteLine($"Reusing connection {connection.Id}");
return connection;
}
// Если нет доступных соединений, создаем новое
var newConnection = new DatabaseConnection();
Console.WriteLine($"Creating new connection {newConnection.Id}");
return newConnection;
}
public void ReturnConnection(DatabaseConnection connection)
{
_connections.Add(connection);
Console.WriteLine($"Returned connection {connection.Id} back to the pool");
}
}
В этом коде ConcurrentBag
используется как пул соединений. Когда поток запрашивает соединение, он пытается забрать его из мешка. Если соединений нет, создаётся новое. Когда работа завершена, соединение возвращается в пул для повторного использования другими потоками.
Асинхронное логирование в многопоточной среде
Можно использовать ConcurrentBag для хранения логов и асинхронного их сбора. Допустим, есть приложение, которое пишет логи с разных потоков, и потом они собираются и записываются в файл или базу данных.
Пример:
class Logger
{
private ConcurrentBag<string> _logMessages = new ConcurrentBag<string>();
public void LogMessage(string message)
{
_logMessages.Add($"{DateTime.Now}: {message}");
}
public void WriteLogsToFile()
{
foreach (var message in _logMessages)
{
Console.WriteLine($"Writing log: {message}");
// Пример записи в файл или базу данных
}
}
}
Здесь ConcurrentBag используется для хранения сообщений логов, которые добавляются асинхронно с разных потоков. Когда наступает время сбора логов (например, через каждые 10 минут), данные забираются из ConcurrentBag и записываются в файл или базу данных.
Паттерн Producer-Consumer
И вот мы подошли к, пожалуй, самому важному — Producer-Consumer. Это классический паттерн многопоточных приложений, где один или несколько потоков создают данные (Producers), а другие потоки их обрабатывают (Consumers). В этом случае ConcurrentBag — хороший кандидат для работы с такими задачами.
Пример:
class TaskProcessor
{
private ConcurrentBag<Action> _tasks = new ConcurrentBag<Action>();
public void ProduceTasks()
{
for (int i = 0; i < 10; i++)
{
var task = new Action(() => Console.WriteLine($"Processing task {i}"));
_tasks.Add(task);
Console.WriteLine($"Task {i} added");
}
}
public void ConsumeTasks()
{
while (_tasks.TryTake(out var task))
{
task();
Console.WriteLine("Task executed");
}
}
}
Здесь два метода: ProduceTasks
, который добавляет задачи в ConcurrentBag, и ConsumeTasks
, который забирает задачи и выполняет их.
Подробнее с ConcurrentBag можно ознакомиться здесь.
Всех, кому интересна тема работы с данными в C#, рекомендую посетить открытый урок «Linq на практике», который пройдет 22 октября.
После вебинара вы сможете писать свои linq‑запросы, опираясь на синтаксис linq, и сделать свою работу эффективней благодаря применению компараторов. Записаться на урок можно на странице курса «C# Developer».
Комментарии (9)
rotabor
11.10.2024 19:09А это, очевидно, какая-то новая логика:
Поэтому ConcurrentBag создана для сценариев, где один и тот же поток часто и добавляет, и извлекает данные. Поэтому эта коллекция идеальна для паттернов типа Producer-Consumer, где один поток наполняет коллекцию, а другой извлекает данные для дальнейшей обработки.
То ли один поток, то ли не один, то ли много...
Varim
11.10.2024 19:09если один поток заканчивает выполнение и его локальная очередь пустеет, он может украсть данные из очереди другого потока.
И как поток ворует данные из другого потока?
Mingun
11.10.2024 19:09Асинхронное логирование в многопоточной среде
Вообще-то, при логировании важен порядок поступающих сообщений, так что хранить их в коллекции, которая такой порядок не сохраняет, мягко говоря, бредовая идея.
Kanut
11.10.2024 19:09Зависит от того как вы логируете. Если вы пишите в базу данных и каждый лог имеет таймстэмп, то не особо принципиально в каком порядке вы их запишите в базу.
onyxmaster
11.10.2024 19:09Тут вопрос в том что происходит когда очередь переполняется, пусть даже временно. С FIFO понятную человеку политику пропуска/удаления сделать проще чем с контейнером который возвращает данные в, условно, произвольном порядке.
А так да, вероятно у всех современных распределённых систем журналирования есть поддержка записи с временной меткой в прошлом, просто окно не слишком большое должно быть.
onyxmaster
11.10.2024 19:09Поэтому ConcurrentBag создана для сценариев, где один и тот же поток часто и добавляет, и извлекает данные. Поэтому эта коллекция идеальна для паттернов типа Producer-Consumer, где один поток наполняет коллекцию, а другой извлекает данные для дальнейшей обработки
Странный вывод из верного посыла. ConcurrentBag медленнее чем ConcurrentQueue, если читают и пишут разные потоки, а традиционный producer-consumer обычно реализован именно так, какая-то группа потоков пишет, а какая-то — разгребает полученное. Смешивать эти группы обычно вредно и для понятности и для производительности. Конечно всегда есть async, где не узнать заранее на какой поток приземлится выполняемый код, но я бы постарался всё равно прибить или производителей или потребителей к одному и тому же набору потоков.
Ну и ещё есть BlockingCollection, который решает многие другие проблемы из реального мира, типа ограничений на размер (которые в случае ConcurrentXXX легко не решаются ибо Count там очень медленный, а добавлять внешний относительно коллекции Interlocked.Increment по производительности может оказаться неприятно).
kefirr
11.10.2024 19:09Многопоточное кэширование объектов
Для целей object pooling есть класc ObjectPool
Паттерн Producer-Consumer
См When to use a thread-safe collection:
In pure producer-consumer scenarios, System.Collections.Concurrent.ConcurrentBag<T> will probably perform more slowly than the other concurrent collection types.
In mixed producer-consumer scenarios, ConcurrentBag<T> is generally much faster and more scalable than any other concurrent collection type for both large and small workloads.
Baton34
Подумал какой-то новый синтаксис в шарпе, а это чудеса копипасты.
Task.Factory.StartNew(() =>