
Всем добрый день! Я технический специалист, работающий в системе внутреннего аудита, в мои обязанности входит создание инструментов ETL на языке программирования C#.
Периодически источниками данных становятся жестко структурированные файлы формата xml, csv, json или любого другого формата. Иногда их количество становится достаточно большим и постоянно увеличивающимся. Например, в одной из моих задач количество файлов увеличивалось со средней скоростью обновления примерно 150 000 файлов в сутки. Если при этом обработка одного файла (считывание массива байт с жесткого диска в память, трансформация загруженных данных и запись их в базу данных) занимает секунду, то становится понятно, что обработка всех файлов займет более 40 часов. В этом случае мы не сможем обработать эти файлы до конца, так как скорость увеличения количества файлов будет явно выше скорости их обработки.
Одно из решений данной проблемы – разработать приложение, в котором создать пул независимых друг от друга потоков. В потоках будут обрабатываться файлы, выбираемые из общей очереди. Однако в этом случае возникают сложности с синхронизацией работающих потоков и совместного использования ресурсов, так как очень вероятно появление взаимных блокировок.
Что бы избежать этих сложностей компания Microsoft добавила в фреймоворк .Net библиотеку TPL (начиная с версии 4.0). Я расскажу, как используя возможности этой библиотеки решить данную проблему.
Итак, изначально алгоритм работы выглядит следующим образом:
Сканируется каталог хранения файлов и возвращается список (например, List), содержащий данные о всех файлах;
Запускается цикл (for или foreach) в котором данные из очередного файла считываются в память, при необходимости трансформируются и записываются в БД.
Очевидно, что самые затратные по времени операции – это считывание данных с жесткого диска в память и запись данных из памяти в БД.
Попробуем оптимизировать наш алгоритм при помощи библиотеки TPL:
Пункт 1.
Изменим список, возвращаемый функцией сканирования каталога хранения файлов с List на ConcurrentQueue.
Для чего мы это делаем? Дело в том, что класс ConcurrentQueue является потокобезопасным, то есть если одновременно два потока попытаются извлечь данные из этого списка или записать в него данные, то у нас не возникнет исключений (Exception).
Пункт 1 нашего алгоритма будет выглядеть так: сканируется каталог хранения файлов и возвращается список ConcurrentQueue, содержащий данные о всех файлах.
Пункт 2:
Изменим конструкцию формирующую цикл обработки данных из файла. Заменим for на Parallel.For или Parallel.ForEach.
В чем отличие новой конструкции от for? Тут всё просто и в принципе понятно из названия языковой конструкции. Все итерации цикла выполняются в параллельных потоках. В качестве примера я покажу организацию цикла конструкцией Parallel.ForEach:
Parallel.ForEach(listFiles, (currentFile) =>
{
var dataFile = getDataFile(currentFile.FullName);
TransformData(dataFile);
WriteToDB(dataFile);
});
где:
listFiles – это коллекция типа ConcurrentQueue содержащая спи-сок файлов в каталоге;
currentFile – элемент коллекции listFiles, который возвращается конструк-цией ForEach;
dataFile – условная некоторая структура данных в памяти, получаемая счи-тыванием содержимого файла в память;
getDataFile – условная функция возвращающая содержимое файла в виде некоторой структуры данных;
TransformData – условная процедура трансформации полученных данных;
WriteToDB – условная процедура записи данных в БД.
В данном примере, с помощью конструкции Parallel.ForEach, мы организуем цикл. В этом цикле, в параллельных потоках, производится считывание данных с жесткого диска, их трансформация и запись в БД. При этом, проблемы организации работы параллельных потоков отсутствуют. Количество параллельных потоков зависит от числа ядер процессора и их загруженности.
Используя предложенный алгоритм, мы ускорим обработку файлов, как минимум в 2 раза. Хотя, конечно, эта цифра будет меняться в зависимости от количества ядер и памяти машины, на которой будет запускаться программа.
Так же для ускорения работы программы нужно вынести запись в БД в отдельный поток, работающий независимо от основного. Сделать это можно при помощи коллекции ConcurrentQueue, чтобы избежать конфликтов при добавлении данных в оче-редь.
Перепишем вышеприведенный пример с учетом оптимизации записи в БД.
Предположим, что процедура чтения файлов возвращает нам данные в DataTable):
Parallel.ForEach(listFiles, (currentFile) =>
{
DataTable dataFile = getDataFile(currentFile.FullName);
TransformData(dataFile);
threadWriteToDB.ListData.Enqueue(dataFile);
});
Как видно, вместо строки с вызовом процедуры записи в БД, мы просто добавляем в коллекцию ConcurrentQueue ListData описанную и инициализированную в отдельном потоке, экземпляр которого threadWriteToDB используется в нашем цикле.
Запись в БД происходит уже в отдельном потоке. Запись в БД можно организовать аналогично работе с файлами, с помощью конструкций Parallel.For и/или Paral-lel.Foreach.
В моей задаче, где потребовалась обработка сопоставимого количества файлов, сейчас может обрабатываться в среднем от 200 000 до 400 000 файлов в сутки, при чем скорость ограничивается загрузкой БД и шириной канала данных.
Griboks
А почему бы не использовать итератор, который по умолчанию ещё и является потокобезопасным для чтения.
Загрузим мы 8 гигов в оперативу и сломаем компьютер. Тут нужно искать tradeoff между скоростью и оперативкой. Для этого можно было бы возвращать буферезированный поток, а не полный массив.
Поэтому мы параллельно считываем? От этого скорость только упадёт из-за накладных расходов по сравнению с последовательным чтением.
NewTechAudit Автор
Добрый день, спасибо за комментарий.
1. А почему бы не использовать итератор, который по умолчанию ещё и является потокобезопасным для чтения.
Выдержка с сайта Microsoft:
“Все открытые и защищенные члены ConcurrentQueue являются потокобезопасными и могут использоваться одновременно из нескольких потоков.”
2. Загрузим мы 8 гигов в оперативу и сломаем компьютер. Тут нужно искать tradeoff между скоростью и оперативкой. Для этого можно было бы возвращать буферезированный поток, а не полный массив.
Да, такая возможность существует. Но в статье не приводится универсальный код, а взят пример из реальной задачи, в которой есть знание максимально возможного размера обрабатываемого файла (~1 мегабайт) и знание того, что среда выполнения .Net никогда не выделит столько потоков под конструкцию Parallel.Foreach (или Parallel.For), что бы забить полностью доступную нам оперативную память.
Для примера: если предположить, что нам доступны 6 гигабайт, то 6*1024 потоков единовременно или в течении краткого периода времени никогда выделено не будет. Таким образом, вероятность того что мы загрузим ОЗУ полностью стремится к нулю. И даже если предположить, что всё-таки машина сможет выделить столько потоков то компьютер мы не сломаем. Просто приложение будет закрыто с ошибкой переполнения памяти (хотя надо признать, что в этот момент действительно возможны будут замедления работы других процессов).
3. Поэтому мы параллельно считываем? От этого скорость только упадёт из-за накладных расходов по сравнению с последовательным чтением.
Да именно поэтому. Упадет скорость или нет – это достаточно спорный вопрос, дискуссии по которому шли и периодически до сих пор вспыхивают в сети. Падение скорости более вероятно на классических HDD, если же применяются SSD диски, то соответственно эта вероятность будет существенно ниже. Конкретно в нашем случае – было ускорение обработки файлов в несколько раз. Возможно заменить его на параллельную запись в БД, конструкция останется той же самой.
SpiderEkb
А как вы контролируете потоки? Ну вот какая-то нештатная ситуация и поток встал. Или зациклился. Что будет в этом случае? Как вы реагируете на такое?