
«Компетентный программист полностью осознаёт строго ограниченный размер собственного черепа», — Эдсгер Дейкстра
Оглавление
Глава 1: Разрыв в производительности
Глава 3: Бенчмаркинг и профилирование
Глава 4: Массивы и локальность кэша
Глава 5: Связанные списки — убийцы кэша
Глава 7: Хэш-таблицы и конфликты кэша
Глава 8: Динамические массивы и управление памятью
Глава 9: Двоичные деревья поиска
Глава 10: B-деревья и деревья, эффективно использующие кэш
Глава 11: Префиксные деревья и базисные деревья
Глава 12: Кучи и очереди с приоритетом
Глава 13: Структуры данных без блокировок
Глава 14: Обработка строк и эффективность использования кэша
Глава 15: Графы и их обход с эффективным использованием кэша
Глава 16: Фильтры Блума и вероятностные структуры данных
Глава 17: Структуры данных загрузчиков
Глава 18: Очереди драйверов устройств
Загадка утери пакетов
Наш сетевой драйвер терял пакеты. Не время от времени, а постоянно. На пропускной способности линии с 64-байтными пакетами мы теряли 31% всего трафика.
В качестве оборудования использовался Ethernet-контроллер на 1 Гбит/с на SoC RISC-V. В спецификациях говорилось, что он может справляться со скоростью проводного трафика. Движок DMA работал корректно. Обработчик прерываний срабатывал вовремя. Тем не менее, пакеты исчезали.
Я начал с очевидного подозреваемого: очереди получения. Реализация выглядела вполне логично — простой связанный список с указателями на голову и хвост:
typedef struct rx_buffer { uint8_t data[2048]; size_t len; struct rx_buffer *next; } rx_buffer_t; typedef struct { rx_buffer_t *head; rx_buffer_t *tail; spinlock_t lock; } rx_queue_t; void rx_enqueue(rx_queue_t *q, rx_buffer_t *buf) { spin_lock(&q->lock); buf->next = NULL; if (q->tail) { q->tail->next = buf; } else { q->head = buf; } q->tail = buf; spin_unlock(&q->lock); } rx_buffer_t *rx_dequeue(rx_queue_t *q) { spin_lock(&q->lock); rx_buffer_t *buf = q->head; if (buf) { q->head = buf->next; if (!q->head) { q->tail = NULL; } } spin_unlock(&q->lock); return buf; }
Под нагрузкой (64-байтные пакеты на пропускной способности линии) драйвер терял пакеты:
$ iperf3 -c 192.168.1.100 -u -b 1G -l 64 [ 5] 0.00-10.00 sec 714 MBytes 599 Mbits/sec [ 5] Packets sent: 5,950,000 [ 5] Packets lost: 1,850,000 (31.1%) Packet loss: 31.1% Throughput: 599 Mbps (target: 1000 Mbps)
Потеря 31% пакетов! При профилировании обнаружилась причина проблемы:
$ perf record -e cycles,cache-misses ./network_driver $ perf report 42.3% rx_enqueue/rx_dequeue 28.5% Spinlock contention 18.7% Packet processing 10.5% Other Cache misses: 18.5M per second
Производительность убивали связанный список и спин-блокировки.
Я переписал драйвер, использовав кольцевой буфер без блокировок. Результаты:
$ iperf3 -c 192.168.1.100 -u -b 1G -l 64 [ 5] 0.00-10.00 sec 1.19 GBytes 1.02 Gbits/sec [ 5] Packets sent: 9,850,000 [ 5] Packets lost: 12,000 (0.12%) Packet loss: 0.12% Throughput: 1020 Mbps (exceeds target!) Cache misses: 2.8M per second (6.6× fewer)
Потеря 31% пакетов превратилась в 0,12% — улучшение в 258 раз!
В этой главе мы поговорим о структуре очередей для драйверов устройств.
Окружение драйвера устройства
Драйверы устройств работают в уникальном окружении:
1. Контекст прерываний
Ограничения:
Нельзя уходить в сон (отсутствие блокирующих операций)
Нельзя распределять память (malloc может уйти в сон)
Необходима скорость (чтобы поспевать за прерываниями)
Ограниченное пространство стека (часто 4 КБ или меньше)
Вывод: необходимы заранее распределённые буферы без блокировок или с очень быстрой блокировкой.
2. Шаблон «производитель-потребитель»
Типичный поток исполнения:
Производитель: обработчик прерываний получает данные от оборудования
Потребитель: поток ядра или пользовательский процесс считывает данные
Вывод: требуется эффективная очередь для передачи данных между контекстами.
3. Высокая скорость обработки
Требования:
Сеть: 1-100 Гбит/с (миллионы пакетов в секунду)
Накопитель: 1-10 ГБ/с (тысячи операций ввода-вывода в секунду)
Последовательный интерфейс: 1-10 Мбит/с (тысячи байт в секунду)
Вывод: важен каждый такт. Необходимы удобные для кэша структуры данных.
4. Ограниченная память
Ограничения:
Безграничный рост невозможен (память ядра ограничена)
Необходимость правильной обработки переполнения (отбрасывание пакетов или блокировка)
Вывод: идеально подходят кольцевые буферы фиксированного размера.
Что нам рассказывают в учебниках
Для буферизации данных между оборудованием и ПО драйверы устройств используют очереди:
Связанные списки для гибкости
Блокировки для синхронизации
Динамическое распределение для буферов
Всё понятно и просто.
Проверка реальностью: почему стандартные очереди терпят неудачу
1. Связанные списки недружественны к кэшу
Каждое добавление в очередь/извлечение из очереди затрагивает несколько линий кэша:
// Добавление в очередь: 3 операции доступа к памяти buf->next = NULL; // Запись в буфер (промах кэша 1) q->tail->next = buf; // Запись в старый хвост (промах кэша 2) q->tail = buf; // Запись в голову очереди (промах кэша 3)
При 1 миллионе пакетов в секунду: 3 миллионов промахов кэша в секунду для одних только операций с очередями!
2. Спин-блокировки вызывают конкуренцию
Обработчик прерываний (производитель) и поток ядра (потребитель) получают доступ к ядру:
CPU 0 (interrupt): CPU 1 (thread): spin_lock(&q->lock) enqueue packet spin_lock(&q->lock) ← Spinning! spin_unlock(&q->lock) (waiting...) spin_lock acquired dequeue packet spin_unlock(&q->lock)
Результат: CPU 1 тратит циклы на ожидание, пока CPU 0 удерживает блокировку.
3. Динамическое распределение в контексте прерываний
// ПЛОХО: malloc в обработчике прерываний! rx_buffer_t *buf = malloc(sizeof(rx_buffer_t)); // Может уйти в сон!
Проблема: malloc может уходить в сон (ожидать память), однако этого не могут делать обработчики прерываний.
Решение: предварительно распределённые буферы.
Решение 1: кольцевой буфер без блокировок
Кольцевой буфер с атомарными указателями на голову/хвост устраняет блокировки:
#define RX_QUEUE_SIZE 1024 // Обязательно степень двойки typedef struct { rx_buffer_t *buffers[RX_QUEUE_SIZE]; atomic_uint head; // Индекс потребителя atomic_uint tail; // Индекс производителя } rx_ring_t; bool rx_ring_enqueue(rx_ring_t *ring, rx_buffer_t *buf) { uint32_t tail = atomic_load_explicit(&ring->tail, memory_order_relaxed); uint32_t next_tail = (tail + 1) & (RX_QUEUE_SIZE - 1); uint32_t head = atomic_load_explicit(&ring->head, memory_order_acquire); if (next_tail == head) { return false; // Очередь заполнена } ring->buffers[tail] = buf; atomic_store_explicit(&ring->tail, next_tail, memory_order_release); return true; } rx_buffer_t *rx_ring_dequeue(rx_ring_t *ring) { uint32_t head = atomic_load_explicit(&ring->head, memory_order_relaxed); uint32_t tail = atomic_load_explicit(&ring->tail, memory_order_acquire); if (head == tail) { return NULL; // Очередь пуста } rx_buffer_t *buf = ring->buffers[head]; atomic_store_explicit(&ring->head, (head + 1) & (RX_QUEUE_SIZE - 1), memory_order_release); return buf; }
Почему это работает:
Один производитель, один потребитель: не требуется CAS, достаточно атомарных загрузок/сохранений
Упорядочивание памяти: ACQUIRE/RELEASE обеспечивают видимость
Размер, равный степени двойки: деление по модулю становится побитовым AND (быстро!)
Бенчмарк
Тест: 1 миллион операций добавления в очередь/извлечения из очереди Связанный список со спин-блокировкой: Такты: 450 миллионов Промахи кэша: 18,5 миллиона Ожидание блокировки: 28,5% Время: 375 мс Кольцевой буфер без блокировок: Такты: 85 миллионов Промахи кэша: 2,8 миллиона Ожидание блокировки: 0% Время: 71 мс Ускорение: 5,3× Снижение промахов кэша: 6,6×
Решение 2: пул предварительно распределённых буферов
Вместо того, чтобы распределять буферы по запросу, будем распределять пул заранее:
#define BUFFER_POOL_SIZE 2048 typedef struct { rx_buffer_t buffers[BUFFER_POOL_SIZE]; rx_ring_t free_list; // Кольцевой буфер свободных буферов } buffer_pool_t; static buffer_pool_t g_buffer_pool; void buffer_pool_init(buffer_pool_t *pool) { rx_ring_init(&pool->free_list); // Добавляем все буферы в список свободных for (int i = 0; i < BUFFER_POOL_SIZE; i++) { rx_ring_enqueue(&pool->free_list, &pool->buffers[i]); } } rx_buffer_t *buffer_alloc(buffer_pool_t *pool) { return rx_ring_dequeue(&pool->free_list); } void buffer_free(buffer_pool_t *pool, rx_buffer_t *buf) { rx_ring_enqueue(&pool->free_list, buf); }
Преимущества:
Отсутствие malloc в прерывании: просто извлечение из свободного списка
Скорость: распределение/освобождение за O(1)
Ограничение на размер занимаемой памяти: фиксированный размер пула
Удобство для памяти: буферы хранятся в памяти друг за другом
Бенчмарк
Тест: 1 миллион распределений буферов в контексте прерываний context malloc/free: Такты: 200 миллионов (200 тактов на распределение) Промахи кэша: 12 миллионов Время: 167 мс Риск: может уйти в сон! Заранее распределённый пул: Такты: 5 миллионов (5 тактов на распределение) Промахи кэша: 1 миллион Время: 4,2 мс Ускорение: 40×
Решение 3: групповая обработка
Вместо того, чтобы обрабатывать по одному пакету за раз, будем выполнять групповую обработку
#define BATCH_SIZE 32 void process_rx_packets(rx_ring_t *ring) { rx_buffer_t *batch[BATCH_SIZE]; int count = 0; // Извлекаем группу из очереди while (count < BATCH_SIZE) { rx_buffer_t *buf = rx_ring_dequeue(ring); if (!buf) break; batch[count++] = buf; } // Обрабатываем группу for (int i = 0; i < count; i++) { process_packet(batch[i]); } // Освобождаем группу for (int i = 0; i < count; i++) { buffer_free(&g_buffer_pool, batch[i]); } }
Почему это помогает:
Амортизация оверхеда: оверхед одного цикла на 32 пакета
Повышение оптимизации использования кэша: обработка связанных пакетов вместе
Упреждающая выборка: CPU может заранее получать следующий пакет, пока обрабатывает текущий
Бенчмарк
Тест: обработка 1 миллиона пакетов По одному за раз: Такты: 850 миллионов Промахи кэша: 45 миллионов Время: 708 мс Групповая обработка (32 пакета): Такты: 520 миллионов Промахи кэша: 28 миллионов Время: 433 мс Ускорение: 1,6×
Решение 4: опрос в стиле NAPI
Linux NAPI (New API) при высоких нагрузках вместо прерываний использует опросы.
Проблема прерываний
При высокой частоте пакетов прерывания преобладают:
1 Гбит/с, 64-байтные пакеты: Частота пакетов: 1488095 пакетов/с Частота прерываний: 1488095 прерываний/с Оверхед прерываний: ~1000 тактов каждое Суммарно: 1,49 миллиарда тактов/с При 1,2 ГГц: 124% CPU! (невозможно)
Результат: система не поспевает и теряет пакеты.
Решение: снижение проблемы прерываний
typedef struct { rx_ring_t rx_ring; atomic_bool polling; int budget; // Максимальное количество пакетов, обрабатываемое за опрос } napi_context_t; // Обработчик прерываний void eth_interrupt_handler(void) { napi_context_t *napi = &g_napi; // Отключаем прерывания, начинаем опрашивать if (!atomic_exchange(&napi->polling, true)) { eth_disable_interrupts(); schedule_poll(napi); // Планируем опрос в softirq } } // Функция опроса (выполняется в контексте softirq) void eth_poll(napi_context_t *napi) { int processed = 0; while (processed < napi->budget) { rx_buffer_t *buf = rx_ring_dequeue(&napi->rx_ring); if (!buf) break; process_packet(buf); processed++; } // Если мы обработали меньше, чем бюджет, то снова включаем прерывания if (processed < napi->budget) { atomic_store(&napi->polling, false); eth_enable_interrupts(); } else { // Работы всё ещё много, снова планируем опрос schedule_poll(napi); } }
Как это работает:
Низкая нагрузка: используем прерывания (низкие задержки)
Высокая нагрузка: отключаем прерывания, выполняем групповые опросы (высокая пропускная способность)
Адаптивность: переключение между режимами в зависимости от нагрузки
Бенчмарк
Тест: 1 Гбит/с, 64-байтные пакеты (1.49 миллиона пакетов/с) На основе прерываний: Ресурсы CPU: 95% Потеря пакетов: 31% Пропускная способность: 690 Мбит/с NAPI-опросы (budget=64): Ресурсы CPU: 68% Потеря пакетов: 0,12% Пропускная способность: 1020 Мбит/с Улучшения: пропускная способность выросла в 1,48 раза, потребляется на 28% меньше ресурсов CPU
Пример из реального мира: skb_buff ядра Linux
Ядро Linux использует для сетевых пакетов sk_buff (socket-буфер).
Архитектура
struct sk_buff { struct sk_buff *next; struct sk_buff *prev; unsigned char *head; // Начало распределённого буфера unsigned char *data; // Начало самих данных unsigned char *tail; // Конец данных unsigned char *end; // Конец распределённого буфера unsigned int len; // Длина данных unsigned int data_len; // Длина данных по страницам // ... множество других полей };
Основные особенности:
Место в голове/хвосте: пространство до/после данных для заголовков
голова данные хвост конец | | | | v v v v [голова][сами данные][хвост]
Общие данные: несколько skb могут указывать на одни данные (копирование не требуется)
Slab-распределитель: предварительно распределённый пул skb
Почему такая система быстра
// Добавляем заголовок без копирования данных void add_header(struct sk_buff *skb, int header_len) { skb->data -= header_len; // Просто перемещаем указатель! skb->len += header_len; } // Удаляем заголовок void remove_header(struct sk_buff *skb, int header_len) { skb->data += header_len; // Просто перемещаем указатель! skb->len -= header_len; }
Никаких копирований памяти! Только арифметика указателей.
Производительность
Добавление/удаление заголовков (1 миллион операций): Через memcpy: Такты: 450 миллионов Время: 375 мс С арифметикой указателей (skb): Такты: 12 миллионов Время: 10 мс Ускорение: 37,5×
Соединяем всё вместе: оптимизированный сетевой драйвер
Ниже показан готовый оптимизированный драйвер, сочетающий в себе все техники:
#define RX_RING_SIZE 1024 #define BUFFER_POOL_SIZE 2048 #define NAPI_BUDGET 64 #define BATCH_SIZE 32 typedef struct { // Кольцевой буфер без блокировок rx_ring_t rx_ring; // Пул предварительно распределённых буферов buffer_pool_t buffer_pool; // Контекст NAPI atomic_bool polling; int budget; // Статистика atomic_uint packets_received; atomic_uint packets_dropped; } eth_driver_t; static eth_driver_t g_eth_driver; // Обработчик прерываний (производитель) void eth_interrupt_handler(void) { eth_driver_t *drv = &g_eth_driver; // Переключение в режим опроса if (!atomic_exchange(&drv->polling, true)) { eth_disable_interrupts(); schedule_softirq(eth_poll); } } // Функция опроса (выполняется в softirq) void eth_poll(void *arg) { eth_driver_t *drv = &g_eth_driver; int processed = 0; while (processed < NAPI_BUDGET) { // Проверка наличия пакета в оборудовании if (!eth_hw_has_packet()) break; // Распределение буфера из пула rx_buffer_t *buf = buffer_alloc(&drv->buffer_pool); if (!buf) { atomic_fetch_add(&drv->packets_dropped, 1); eth_hw_drop_packet(); continue; } // Получение пакета от оборудования buf->len = eth_hw_receive(buf->data, sizeof(buf->data)); // Занесение в кольцевой буфер if (!rx_ring_enqueue(&drv->rx_ring, buf)) { buffer_free(&drv->buffer_pool, buf); atomic_fetch_add(&drv->packets_dropped, 1); } else { atomic_fetch_add(&drv->packets_received, 1); } processed++; } // Если мы обработали меньше, чем заданный бюджет, снова включаем прерывания if (processed < NAPI_BUDGET) { atomic_store(&drv->polling, false); eth_enable_interrupts(); } else { // Работы всё ещё много, снова планируем опрос schedule_softirq(eth_poll); } } // Потребитель (поток ядра) void eth_process_packets(void) { eth_driver_t *drv = &g_eth_driver; rx_buffer_t *batch[BATCH_SIZE]; int count = 0; // Извлекаем группу из буфера while (count < BATCH_SIZE) { rx_buffer_t *buf = rx_ring_dequeue(&drv->rx_ring); if (!buf) break; batch[count++] = buf; } // Обработка группы for (int i = 0; i < count; i++) { process_packet(batch[i]->data, batch[i]->len); } // Освобождение группы for (int i = 0; i < count; i++) { buffer_free(&drv->buffer_pool, batch[i]); } }
Окончательный бенчмарк
Тест: Ethernet на 1 Гбит/с, 64-байтные пакеты (1,49 миллиона пакетов/с) Изначальный код (связанный список, спин-блокировка, прерывания): Пропускная способность: 599 Мбит/с Потеря пакетов: 31,1% Ресурсы CPU: 95% Промахи кэша: 18,5 миллиона/с Оптимизированный (кольцевой буфер, пул, NAPI, групповая обработка): Пропускная способность: 1020 Мбит/с Потеря пакетов: 0,12% Ресурсы CPU: 68% Промахи кэша: 2,8 миллиона/с Улучшения: Пропускная способность: больше в 1,70 раза (599 → 1020 Мбит/с) Потеря пакетов: меньше в 259 раз (31,1% → 0,12%) Ресурсы CPU: снижение потребления на 28% Промахи кэша: меньше в 6,6 раза
Подведём итог
Загадка терявшихся пакетов была решена. Потери сетевого драйвера упали с 31% всего до 0,12%, то есть мы получили 259-кратное улучшение. Пропускная способность повысилась с 599 Мбит/с до 1020 Мбит/с, превысив целевой 1 Гбит/с.
Ключевые выводы:
Кольцевые буферы без блокировок позволяют избавиться от конкуренции. Очередям с одним производителем и одним потребителем требуются только атомарные загрузки и сохранения без CAS. По сравнению с очередями на основе спин-блокировок скорость увеличилась в 5,3 раза.
Без пулов предварительно распределённых буферов не обойтись. При работе с пулами распределение в контексте прерываний происходит в 40 раз быстрее, чем при использовании malloc. Отсутствует риск ухода в сон.
Групповая обработка амортизирует оверхед. Обработка 32 пакетов за раз в 1,6 раз быстрее, чем по очереди. Оптимизируется использование кэша и упреждающая выборка.
При высоких нагрузках опросы в стиле NAPI побеждают прерывания. Адаптивная работа с прерываниями повышает пропускную способность в 1,48 раза и позволяет потреблять на 28% меньше ресурсов CPU.
Арифметика указателей побеждает memcpy. sk_buff Linux использует пространство голов/хвостов для добавления/удаления заголовков без копирования. Это в 37,5 раза быстрее, чем memcpy.
Показатели сетевого драйвера:
Кольцевой буфер без блокировок: в 5,3 раза быстрее, в 6,6 раза меньше промахов кэша
Предварительно распределённый пул: в 40 раз быстрее, чем malloc
Групповая обработка: в 1,6 раза быстрее
Опрос NAPI: пропускная способность в 1,48 больше, потребляется на 28% меньше ресурсов CPU
Суммарно: пропускная способность увеличилась в 1,70 раза, потеря пакетов снизилась в 259 раз
Драйверам устройств требуются предварительно распределённые и удобные для кэширования структуры данных без блокировок. При высокой частоте пакетов важен каждый такт.
В следующей главе: управление памятью прошивок — как управлять памятью во встраиваемых системах с ограниченными ресурсами.