Привет, Хабр!
Кратко о том что такое потоковая обработка данных и в чем её отличие от пакетной.
Пакет данных, это часть информации поступающая в систему которая содержит законченный или не полный фрагмент данных. Большинство механизмов цифровой передачи информации в современных системах построены на пакетной передаче. Отличие потоковых и пакетных систем обработки в том, когда эти данные обрабатываются:
Потоковая обработка данных подразумевает обработку данных и реакцию на них непосредственно в момент получения с минимальными задержками.
Пакетная же в свою очередь занимается обработкой спустя установленные промежутки времени.
При проектировании систем потоковой обработки мы ожидаем получить ряд свойств:
Минимальные задержки в обработке, получении и отправки данных
Работу в режиме реального времени(предсказуемость времени выполнения)
Возможность с минимальными простоями менять конфигурацию системы
Вышеизложенные свойства накладывают определенные ограничения на подобного рода системы:
Мы не можем использовать языки со "сборкой мусора", текущие подходы к освобождению ресурсов подразумевают произвольную остановку рабочих потоков приложения.
По тем же причинам не подходят языки в которых трансляция и оптимизация иcходного кода производится в процессе выполнения.
Абстракция
Для того чтобы лучше понять потоковую обработку данных, лучше всего представлять её в виде простых структур данных таких как списки, деревья, и графы.
Пусть единицей обработки данных будет узел графа(Нода)
Каждый узел может иметь тип:
INPUT - принимает поток данных из источника(Диск, сеть, БД, или любой другой механизм межпроцессного взаимодействия)
OUTPUT - отправляет или сохраняет данные любым из возможных способов аналогично INPUT
PROCESSING - обрабатывает данные
Путем переупорядочивания узлов мы можем получать разные свойства системы, необходимые для решения поставленной задачи.
Список
Самой простой структурой будет односвязный список, данные идут в одном направлении от источника к назначению. В односвязном списке просто добавлять и удалять новые узлы,но он не подходит для реализации сложных сценариев.

Дерево
В древовидных структурах, появляется возможность ветвления, в качестве листьев могут быть несколько OUTPUT узлов.

Граф
Самый распространенный и гибкий тип, это граф, есть возможность цикличных путей, петель, и даже не связных подграфов обработки.

Выбор подходящей структуры
При выборе структуры нужно отталкиваться от технического задания. Будет ли несколько источников? В зависимости от входных данных нужно ли иметь несколько сценариев обработки? Сколько будет источников данных?
Пример 1
Входное изображение всегда JPEG, декодируем его и передаем в обработку, подойдёт односвязный список.
Пример 2
Входными данными может быть поток h264 или HEVC, в зависимости от типа входного видеопоток нужно перенаправить данные в разные узлы, тут может подойти древовидная структура.
Пример 3
Более сложным сценарием может быть обработка сетевых пакетов.
Допустим на вход подается ICMP/UDP/TCP. Датаграммы и ICMP пойдут в одни узлы,
а TCP потребует хранения состояния соединения, появляется необходимость обхода узлов обработки в разных направлениях. В подобных сценариях подойдет граф.
Общее описание архитектуры системы
Структура приложений выглядит примерно так:
/plugins - хранит плагины в виде разделяемых библиотек(на Linux это .so)
/app - хранит исполняемый файл приложения
/config - хранит описание подключаемых модулей(опционально)
В текстовом виде алгоритм работы можно описать так.
Во время запуска приложения, определяется количество подключаемых узлов обработки, производится их валидация и инициализация, затем идет этап планирования - строится граф выполнения, то есть какой узел какому передает данные. После подготовительного этапа, запускаются рабочие потоки в которых планировщик перекладывает данные от узла к узлу и запускает обработку.
Важным отличием систем, будет этап планирования. Часто планирование выполняется в процессе выполнения, каждая нода во время обработки данных передаёт информацию о том, куда дальше передать данные на обработку. Этот подход позволяет более гибко строить граф обработки, и реализовывать очень нетривиальные сценарии.
Задача
Ниже на синтетическом примере будет разобран первый тип - односвязный список.
Написать модуль для платформы видео аналитики, который будет принимать кадры в формате JPEG из сети, и сохранять на диск в BMP формате, так же нужно сохранять файл с результатами аналитики в виде CSV. В качестве тестового плагина обработки, добавить плагин который будет менять значения пикселей R и G местами.Так же модуль должен иметь возможность добавления нового функционала в граф обработки/аналитики.
Для начала перед проектированием системы нужно обозначить границы ответственности системы. Декодирование данных и модули аналитики, будет основной зоной ответственности, получение и отправка это второстепенные модули так как на задержки сети, сбои в работе API, повлиять сложно.
Структура тестового приложения
app/ ├── include/ │ └── infra.h ├── plugins/ │ ├── input_plugin.c | ├── processing_plugin.c | └── output_plugin.c ├── config/ | └── conf.json ├── core/ | └── main.c └── MakeFile
app - корневая директория
include - общие заголовочные файлы
plugins - исходные файлы плагинов
config - шаблоны конфигов
core - содержит main.c
В директории include создаем общий заголовочный файл infra.h, в котором определим общие для всех модулей типы данных.
Определим в виде перечисления три типа узлов.
infra.h
typedef enum NODE_TYPE_C { INPUT_NODE, OUTPUT_NODE, PROCESSING_NODE } NODE_TYPE_T;
Определим тип данных который мы будем передавать между узлами.
infra.h
// Пиксели изображения всегда в формате RGB typedef struct matrix_s { unsigned int weight_; unsigned int height_; unsigned char *data_; } matrix_t; typedef struct data_s { matrix_t matrix_; // Изображение char *metadata_; // Метаданные для дальнейшего хранения результатов аналитики } data_t;
Каждый узел обработки имеет два указателя prev, next указывающие на то куда передавать поток данных.Так же может потребоваться предварительная инициализацию плагина, для этого в качестве прототипов функций используются указатели, для определения сигнатуры и последующего полиморфного поведения узлов.
infra.h
typedef void (*init_function_t)(void); // Прототип функции инициализации typedef unsigned short (*processing_function_t)(data_t**, unsigned short); // Прототип функции обработки
Ниже определена структура узла системы.
infra.h
typedef struct node_s { char *name_; // Уникальное имя узла NODE_TYPE_T type_; // Тип INPUT/OUTPUT/PROCESSING char *prev_; // Имя предыдущего узла, для INPUT: NULL char *next_; // Имя следующего узла, для OUTPUT: NULL init_function_t init; // Метод инициализации ноды processing_function_t processing; // Метод обработки ноды } node_t;
Основной механизм подключения и удаления узлов в Linux - через разделяемые библиотеки(.so). В разных источниках такие библиотеки называют модулями или плагинами, ниже под этими понятиями будет подразумеваться разделяемая библиотека. В С/Linux для связывания во время выполнения, определения функций находятся в заголовочном файле dlfcn.h.
В каждом плагине нужно инициализировать структуру ноды которая будет хранить все необходимые поля и функцию для экспорта данной структуры. Для удобства определения заведем макрос которым, будем генерировать нужные сигнатуры. Чтобы избежать конфликтов в процессе динамического связывания все объявляемые глобальные переменные и функции нужно помечать как static.
infra.h
typedef node_t* (*get_node_structure)(void); // Сигнатура функции экспорта для получения структуры node_t из плагина #define REGISTER_NODE(plugin_name, type, processing_function, init_function, prev, next) \ static node_t node = {.name_ = plugin_name, .type_ = type, \ .init = init_function, .processing = processing_function, \ .prev_ = prev, .next_ = next}; \ node_t* getnode_structure() { return &node; }
Плагин получения потока данных(для простоты опущены части кода обращения к API).
input_plugin.c
static void init(void) { init_internal(); // Инициализируем внутренне состояние, подключаемся к API и т.д. log("Input plugin loaded"); } static unsigned short processing(data_t **data, unsigned short count) { if(data == NULL) return 0; unsigned short i = 0; for (; i < 5; ++i) { // Получаем изображения по сети, не более 5, чтобы не тормозить обработку, в реальных условиях настройки количество батчей нужно выносить в API unsigned char *jpeg_image = get_image(); // Получаем изображения if (jpeg_image == NULL) break; decode_image(data[i], jpeg_image); // Декодируем и сохраняем free(jpeg_image); } return i; // Хорошей практикой для планировщика будет возвращать количество получаемых данных } REGISTER_NODE("input-node", INPUT_NODE, processing, init, NULL, "processing-node"); // Регистрируем ноду, определяем в какой узел передавать данные
Плагин обработки потока данных, строится аналогиным образом.
processing_plugin.c
static void init(void) { log("Processing plugin loaded"); } static unsigned short processing(data_t **data, unsigned short count) { unsigned short i = 0; for (; i < count; ++i) { unsigned int sz = data[i].width_ * data[i].height_; // Вычисляем размер матрицы for (unsigned int j = 0; j < sz; j += 3) { // Так как j будет всегда будет указывать на R, меняем местами значение пикселей unsigned char pix = data[i].data_[j]; data[i].data_[j] = data[i].data_[j + 1]; data[i].data_[j + 1] = pix; } } return i; // Возвращаем количество обработанных изображения } REGISTER_NODE("processing-node", PROCESSING_NODE, processing, init, "input-node", "output-node");
Плагин отправки данных(для упрощения опущены обращения к API).
output_plugin.c
static void init(void) { log("Output plugin loaded"); } static unsigned short processing(data_t **data, unsigned short count) { unsigned short i = 0; for (; i < count; ++i) { save_data(data[i]); // Сохраняем изображение в формате BMP и metadata_ в csv free(data[i].data_); free(data[i].metadata_); } free(data); return i; } REGISTER_NODE("output-node", OUTPUT_NODE, processing, init, "pricessing-node", NULL);
Основной цикл обработки.
main.c
#define MAX_NODES (256) // Фиксируем максимальное количество узлов node_t *nodes[MAX_NODES]; // Определяем массив узлов memset(nodes, NULL, MAX_NODES); /*Читаем из конфига или командной строки, список разделяемых библиотек экспортируем из них узлы.*/ fill_nodes(argc, argv, nodes); // В нашем случае если прочитаем по порядку в массиве будут лежать три зарегистрированные ноды["input-node", "processing-node", "ouput-node"] init_nodes(nodes); // Вызываем функции инициализации // Заполняем порядок, ищем какая нода на какую ссылается по имени, и заполняем индексы // Определяем порядок, какая нада какой будет передавать данные unsigned char order[MAX_NODES]; unsigned char order_counter = fill_order(&order, nodes); // [0, 1, 2] - если прочитали в порядке как указано выше // Обход узлов списка, запуск обработки while(1) { data_t **data = NULL; unsigned short count = 0; for (unsigned short idx = 0; idx < order_counter; ++idx) { count = nodes[order[idx]]->processing(data, count); if (count == 0) { if (nodes[order[idx]].type_ != INPUT_NODE) { // Обработки исключительной ситуации } continue; } } }
Критический недостаток данной реализации новое считвание не происходит пока данные не пройдут по всем нодам, иллюстрация ниже.

В реальных же условиях поток данных не прерывно считывается из источника(ов), это позволяет не прерывно опрашивать источник уменьшая задержки, как это показанно на иллюстрации ниже.

Так же данный пример не учитывает многопоточную среду выполнения, что является одним из основных драйверов роста производительности на современных архитектурах.
Вывод
В качестве итогов, будут перечисленны плюсы данных подходов к разработке систем потоковой обработки данных:
Грамотно проектирования система передачи информации о модулях системе и планировщику, позволяет динамично менять порядок обработки не прерывая поток выполнения, или прерывая с минимальными потерями данных. Как пример приложение всегда может держать открытый сокет куда будет передаваться обновленная конфигурация графа и список плагинов с подключаемыми узлами.
Простота в масштабировании подобных архитектур в многоядерные среды. Каждый поток выполнения держит свой граф узлов, и работает с данными не конкурируя за ресурсы с другими обработчиками.
В современных серверных архитектурах достаточные объёмы данных помещаются в кеши, при хранении состояния между нодами, минимизируются промахи по кешам.
Данные можно обрабатывать пачками(батчами), что дает возможность дальнейших оптимизаций, например векторизации.
Ниже в комментариях напишите была ли статья полезна и о вашем опыте работы с аналогичными системами, и проектированием их. Особенно интересен ваш опыт работы с GStreamer или VPP.
Комментарии (14)

rukhi7
14.04.2026 17:34Выбор подходящей структуры
...
Пример 1
...Пример 2
...Пример 3
Это тут к чему, например? Что выбрали то? Выглядит так что выбирать передумали.

uEvg Автор
14.04.2026 17:34Доброго времени суток. Для простоты представления описаны три структуры, каждой из них сопоставлен пример. Возможно не понял вопроса, уточните пожалуйста.

rukhi7
14.04.2026 17:34Вопрос то, вроде, проще некуда: "Что выбрали?" У вас же озаглавлено: "Выбор подходящей структуры". Я надеялся на ваши пояснения смысла этого параграфа который выглядит, явно, не законченным.

valq7711
14.04.2026 17:34В любой поточной обработке есть ботлнек, проходимость которого должна быть ощутимо больше, чем максимальная нагрузка по входу, а иначе рано или поздно случится лаг (условно выход захлебнулся ненадолго) и при отсутствии уверенного запаса этот лаг может рассасываться очень долго. И что-то мне подсказывает, что на практике затраты на сборку мусора выглядят пылью на фоне этого запаса

uEvg Автор
14.04.2026 17:34Доброго времени суток. Много есть решений. Обычно используют очереди(или кольцевые буферы), чтобы чаще опрашивать источник, и прокручивать обработку без лага. Ещё хорошей практикой будет декомпозировать узкое место(разбить на несколько узлов), тогда аналогичным образом будем чаще опрашивать источник.

Apoheliy
14.04.2026 17:34Автор не рассмотрел один из интересных случаев, из-за которого потоковую обработку данных часто превращают в пакетную. (да-да, в пакетную).
... и это: ...
Когда несколько источников заходят в один граф. Например, в системах связи (звуковых) это конференция или совещание нескольких участников.
И как только приходят несинхронизированные данные от нескольких источников - сразу начинаешь задумываться об основном вопросе жизни, вселенной и всего остального. Последнее - это сарказм.
А без рассмотрения таких случаев получается всё "чистенько, но бедненько".

uEvg Автор
14.04.2026 17:34Доброго времени суток, вопрос синхронизации потоков из разных источников заслуживает отдельной статьи на самом деле. Спасибо за уточнение.

Jijiki
14.04.2026 17:34могут быть еще чанки, так же многопотоковая обработка чанков с Work Stealing - некии стадии - формирование чанка из конфигурации или соседних частей если они есть - это рефлекторный ответ - или начальная генерация ответа, которая может учитывать уже чанкование. тоесть картинку можно поделить на куски/чанки/области памяти и собирать чанки учитывая соседей, ну как в майнкрафте по-сути, там красивая стейт машина rec/send блоков, потоки - можно указать их число, а мир можно сделать бесконым, вот листы не знаю, может удобно, потомучто получается если смотреть на декодирование картинок в многопотоках, надо делить картинки на чанки учитывая число потоков, а они между собой вычисляют соседей и делают распаковку картинки, тоесть я думал что сначала надо поделить на воркеры, потом нагрузить воркеров, и очищать когда освобождается воркер.
тоесть если представить
строка "123456789"
выделим 9 чанков, кладём на воркеры.
и складываем ответ в строку "987654321" например, 8 потоков 8 воркеров и 1 стилинх поидее, ну образно, я себе так представляю потоковую обработку.
тоесть память нарезается в зависимости от хранилища поделенное на слоты, а воркеры в очереди на количестве потоков выполняют свою область чанка, стыкуя, стыки с соседними поидее.
на С это больно, я дошел только на Расте до этого, там многопоточка есть на mpsc -(mpsc есть и другие типы каналов - библиотеки ) минимальная хотябы, много бойлеркода скипается...
в купе с ускорением через lto и level-optimisation ваще летит клон этот )
vadimr
Сохранять на диск и принимать TCP, значит, можно, а собирать мусор нельзя?
uEvg Автор
Доброго времени суток, посыл в том что хочется добиться предсказуемого(прогнозируемого) поведения. Работа с сетью, с диском в подобного рода фреймворках может осуществляться асинхронно, не останавливая потоки обработки. Например, в DPDK + VPP, получение пакетов сетевой карты работает в режиме опроса(пулинга), и мы можем прогнозировать задержки получения пакетов.
Отвечая на ваш вопрос, сборка мусора с моем понимании, остановит рабочий поток произвольно, обойдет граф ресурсов для освобождения, в то время как можно было заниматься обработкой.
vadimr
На практике, хорошие современные инкрементальные сборщики мусора останавливают процессы в худшем случае на десяток миллисекунд. В то время ввод-вывод может создать значительно большие сложности, особенно при ошибках ввода-вывода. Да и вообще, всё что меньше 1/10 секунды, в операционных системах общего назначения малопредсказуемо.
ReadOnlySadUser
10 ms - это довольно много. За это время можно прилично трафика обработать)
Это не так, если ядра изолировать
uEvg Автор
Спасибо за уточнение, В рамках обработки трафика, например в межсетевых экранах доходит и до 10 в -9.
Ещё можно отключить прерывания на изолированных ядрах. В комбинации с выводом из-под планировщика, получаем почти систему реального времени.
Apoheliy
Предположу, что проблема со сборщиком мусора не только в том, что он останавливает процессы. И не в том, что он работает долго или быстро. Прим.: тем более, что выделение памяти может быть хуже: и происходит долго; и блокирует потоки.
Проблема со сборщиком мусора в том, что он ПОЗВОЛЯЕТ создавать "мусор". А когда его позволяют создавать, то сразу появляются желающие его создать (потому, что это проще и удобнее). И это часто приводит к тому, что память начинает выделяться/освобождаться намного чаще, чем нужно: объект -> в другой объект -> в третий объект ... и так далее ... и всё это память.
А если НЕ позволять создавать мусор, то всё начинает работать намного более предсказуемее (хотя и программировать может тяжелее). И язык Си очень хорошо подходит для того, чтобы не создавать мусор: запускается процесс, в котором либо на этапе запуска уже выделена память (например, через сегменты данных), либо (после чтения командной строки или конфигурационного файла) сразу выделяется требуемый набор массивов, списков, блоков памяти. И далее ведётся работа уже с готовой памятью.
Хочешь декодировать jpeg - пожалуйста! Только не в новую память! Вот есть специально выделенный блочок памяти под это - вот там и декодируй. Не хватает размера блочка? Ну либо в командной строке укажи больше размер (и он на старте выделится), либо ... ну не нужны тебе такие jpeg-и. Упс!
Уточнюсь: такими подходами можно писать код и на языках со сборкой мусора. Но обычно так не происходит: такие языки придумывают не для того, чтобы вручную памятью управлять; и программисты, которые десятки лет не следили за памятью, очень болезненно начинают смотреть на этот процесс.