Оказалось, что тема суммирования целых чисел в кодировке ASCII в Haswell со скоростью memcpy гораздо популярнее, чем я мог ожидать. Именно поэтому я решил поучаствовать и в другом челлендже в жанре HighLoad: подсчёт uint8. В настоящее время я занимаю всего лишь 13 место в списке лидеров, проигрываю первому месту около 7%, но уже узнал немало интересного. В этом посте я полностью опишу моё решение, в том числе, удивительный паттерн считывания из памяти. Используя его, можно примерно до 30% (по сравнению с обычным последовательным доступом) повысить скорость передачи в контексте одноядерных рабочих нагрузок, ограниченных размером кэша. По-видимому, этот метод малоизвестен.
Как и в других постах автора, программа настроена для следующих входных характеристик высоконагруженной системы: Intel Xeon E3-1271 v3 @ 3,60 ГГц, ОЗУ 512 МБ, Ubuntu 20.04. В ней используется только AVX2, а AVX512 не используется.
Задача
«Выведите на экран, сколько байт соответствует значению 127 в файле размером 250 МБ, который полон байт, равномерно выбранных из диапазона [0, 255] и отправленных в стандартный вывод»
К следующему просто нечего добавить! Решение, которое мы представим, работает в 550 раз быстрее следующей тривиальной программы.
uint64_t count = 0;
for (uint8_t v; std::cin >> v;) {
if (v == 127) {
++count;
}
}
std::cout << count << std::endl;
return 0;
Ядро
Весь исходный код решения приведён в конце этого поста. Но сначала я пошагово объясню, как он работает. Ядро состоит всего из трёх инструкций, поэтому сразу перехожу к блоку __asm__
(извините!).
; rax — это основание ввода
; rsi — это смещение до актуального фрагмента
vmovntdqa (%rax, %rsi, 1), %ymm4
; ymm2 — это вектор, заполненный 127
vpcmpeqb %ymm4, %ymm2, %ymm4
; ymm6 — это аккумулятор, байты которого соответствуют
; текущему счёту экземпляров 127
; на данной позиции во входном фрагменте
vpsubb %ymm4, %ymm6, %ymm6
При помощи этого кода мы перебираем 32-байтные фрагменты ввода и:
Загружаем фрагмент с
vmovntdqa
(это инструкция перемещения, записываемая в память, минуя кэш, вставлена только в стилистических целях и во время выполнения роли не играет).Каждый байт во фрагменте сравниваем со 127 при помощи
vpcmpeqb
, что даёт нам0xFF
(оно же -1), и этот байт соответствует 127, а все остальные —0x00
. Например,[125, 126, 127, 128, ...] принимает вид [0, 0, -1, 0, ...].Вычитаем результат сравнения из аккумулятора. Продолжая предыдущий пример и предполагая, что аккумулятор у нас заполнен нулями, получаем [0, 0, 1, 0, ...].
Затем, чтобы не допустить переполнения этого узкого аккумулятора, мы время от времени дампируем его в более широкий при помощи следующего кода:
; ymm1 — это нулевой вектор
; ymm6 — это узкий аккумулятор
vpsadbw %ymm1,%ymm6,%ymm6
; ymm3 — это широкий аккумулятор
vpaddq %ymm3,%ymm6,%ymm3
vpsadbw
суммирует в аккумуляторе каждые восемь байт, получая из них четыре 64-разрядных числа, после чего vpadddq
суммирует результат с более широким аккумулятором, в котором переполнения гарантированно не произойдёт. В конце работы мы извлекаем результат, чтобы получить окончательный счёт.
Пока ничего экстраординарного. На самом деле, именно такой подход описан в следующей дискуссии на StackOverflow: How to count character occurrences using SIMD.
Начинается волшебство
Сложность этой задачи в том, что вычислений в её рамках очень мало, но они очень ограничены по памяти. Я проштудировал мануал по оптимизации от Intel (там полно опечаток) в поисках нужных мне данных о памяти, пока на странице 788 не встретил рассказ о 4 аппаратных префетчерах (механизмах предвыборки инструкций). Создавалось впечатление, как будто три из них полезны только при последовательном доступе (которым я уже занимался), но в одном, который называется «Streamer», нашёлся интересный нюанс:
«Фиксирует и ведёт до 32 потоков операций доступа к данным. Для каждой 4-килобайтной страницы можно вести один прямой и один обратный поток».
«Для каждой 4-килобайтной страницы». Улавливаете суть? Можно не обрабатывать последовательно весь вывод, а перемежать обработку 4-килобайтных страниц, следующих друг за другом. Также мы немного разматываем ядро и обрабатываем в каждом блоке целую кэш-линию (2x32 байт).
#define BLOCK(offset) \
"vmovntdqa " #offset " * 4096 (%6, %2, 1), %4\n\t" \
"vpcmpeqb %4, %7, %4\n\t" \
"vmovntdqa " #offset " * 4096 + 0x20 (%6, %2, 1), %3\n\t" \
"vpcmpeqb %3, %7, %3\n\t" \
"vpsubb %4, %0, %0\n\t" \
"vpsubb %3, %1, %1\n\t" \
8 из них мы помещаем в главный цикл, где offset устанавливается в размере от 0 до 7 включительно.
В таком случае балл на HighLoad увеличивается примерно на 15%, но, если ваше ядро ещё сильнее ограничено по памяти — допустим, вы просто складываете байты при помощи vpaddb
, чтобы найти их сумму по модулю 255, на этом можно выиграть до 30%. Впечатляет, учитывая, насколько это простое изменение!
В любом случае, есть ещё одна маленькая деталь: мы добавляем предвыборку четырёх ближайших кэш-линий:
#define BLOCK(offset) \
"vmovntdqa " #offset " * 4096 (%6, %2, 1), %4\n\t" \
"vpcmpeqb %4, %7, %4\n\t" \
"vmovntdqa " #offset " * 4096 + 0x20 (%6, %2, 1), %3\n\t" \
"vpcmpeqb %3, %7, %3\n\t" \
"vpsubb %4, %0, %0\n\t" \
"vpsubb %3, %1, %1\n\t" \
"prefetcht0 " #offset " * 4096 + 4 * 64 (%6, %2, 1)\n\t"
Почему именно 4 кэш-линии? Не могу внятно ответить на этот вопрос, просто так работает лучше. На следующем графике показано, как исполняется программа при таком решении, причём, в графике заложены шаги предвыборки от 0 до 100 на иной системе (вот почему оптимум здесь сдвинут). Как видите, кривая довольно сложна.
Исходный код
#include <iostream>
#include <cstdint>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <immintrin.h>
#include <cassert>
#define BLOCK_COUNT 8
#define PAGE_SIZE 4096
#define TARGET_BYTE 127
#define BLOCKS_8 \
BLOCK(0) BLOCK(1) BLOCK(2) BLOCK(3) \
BLOCK(4) BLOCK(5) BLOCK(6) BLOCK(7)
#define BLOCK(offset) \
"vmovntdqa " #offset "*4096(%6,%2,1),%4\n\t" \
"vpcmpeqb %4,%7,%4\n\t" \
"vmovntdqa " #offset "*4096+0x20(%6,%2,1),%3\n\t" \
"vpcmpeqb %3,%7,%3\n\t" \
"vpsubb %4,%0,%0\n\t" \
"vpsubb %3,%1,%1\n\t" \
"prefetcht0 " #offset "*4096+4*64(%6,%2,1)\n\t"
static inline
__m256i hsum_epu8_epu64(__m256i v) {
return _mm256_sad_epu8(v, _mm256_setzero_si256());
}
int main() {
struct stat sb;
assert(fstat(STDIN_FILENO, &sb) != -1);
size_t length = sb.st_size;
char* start = static_cast<char*>(mmap(nullptr, length, PROT_READ, MAP_PRIVATE | MAP_POPULATE, STDIN_FILENO, 0));
assert(start != MAP_FAILED);
uint64_t count = 0;
__m256i sum64 = _mm256_setzero_si256();
size_t offset = 0;
__m256i compare_value = _mm256_set1_epi8(TARGET_BYTE);
__m256i acc1 = _mm256_set1_epi8(0);
__m256i acc2 = _mm256_set1_epi8(0);
__m256i temp1, temp2;
while (offset + BLOCK_COUNT*PAGE_SIZE <= length) {
int batch = PAGE_SIZE / 64;
asm volatile(
".align 16\n\t"
"0:\n\t"
BLOCKS_8
"add $0x40, %2\n\t"
"dec %5\n\t"
"jg 0b"
: "+x" (acc1), "+x" (acc2), "+r" (offset), "+x" (temp1), "+x" (temp2), "+r" (batch)
: "r" (start), "x" (compare_value)
: "cc", "memory"
);
offset += (BLOCK_COUNT - 1)*PAGE_SIZE;
sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc1));
sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc2));
acc1 = _mm256_set1_epi8(0);
acc2 = _mm256_set1_epi8(0);
}
sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc1));
sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc2));
count += _mm256_extract_epi64(sum64, 0);
count += _mm256_extract_epi64(sum64, 1);
count += _mm256_extract_epi64(sum64, 2);
count += _mm256_extract_epi64(sum64, 3);
for (; offset < length; ++offset) {
if (start[offset] == TARGET_BYTE) {
++count;
}
}
std::cout << count << std::endl;
return 0;
}
Заключение
Удивительно, насколько обойдён вниманием паттерн с использованием перемежающихся страниц. Насколько помню, никогда не встречал его в реальной практике. Любопытно! Если вам доводилось с ним сталкиваться, расскажите об этом. А если я забыл ещё о каких-то вариантах оптимизации памяти, сообщите о них тоже.
Комментарии (10)
NickDoom
05.11.2024 22:23Мдас… чтобы ассемблер уверенно обходил все эти «умные компиляции», в наше время надо прямо уже конкретно жестить :)
С другой стороны — такого внимания требует 1% кода (но рискующий сожрать 99% времени выполнения). Тут можно и расщедриться. В этом плане ничего не изменилось :)
sena
05.11.2024 22:23Решение, которое мы представим, работает в 550 раз быстрее следующей тривиальной программы.
Не, я конечно понимаю, что надо привлечь внимание, но это уже перебор. Если хоть как-то интересует производительность, это самый медленный способ читать из файла.
Если использовать обыкновенный read, то наивная реализация работает всего лишь в 5 раз медленнее оптимизированной. И это, конечно, тоже очень неплохо, но сенсации уже не получится.
#include <unistd.h> #include <iostream> int main(int argc, char **argv) { uint64_t count = 0; size_t bufsize = 1024*4*256; uint8_t buf[bufsize]; while(true) { ssize_t nread = read(0, buf, bufsize); if(nread <= 0) break; for(unsigned ii = 0; ii < nread; ++ii) { if (buf[ii] == 127) { ++count; } } } std::cout << count << std::endl; return 0; }
$ g++ -mavx2 -O3 simddumb.cc -o simddumb $ time ./simddumb < testf.bin 53616490 real 0m3,660s user 0m2,474s sys 0m1,185s
$ time ./simd < testf.bin 53616490 real 0m2,150s user 0m0,500s sys 0m1,650s
Размер testf.bin 13GB
mmap это конечно уже не совсем наивная реализация, но надо для начала испытать с mmap и прочими оптимизациями, перед нырянием в ассемблер
Zara6502
05.11.2024 22:23Зарегистрировался на сервисе, написал "свой" вариант на C#
int count = 0; using (var stdin = Console.OpenStandardInput()) { byte[] buffer = new byte[32768]; while (stdin.Read(buffer, 0, buffer.Length) > 0) { for(int c = 0; c < buffer.Length; c++) if (buffer[c] == 127) count++; } } Console.WriteLine(count);
и получил ошибку
Немного поизменял код, позапускал еще три раза и всегда ожидаемое и полученное значения различаются то на 4 то на 65 то на 74 байта.
и как узнать почему насчиталось не столько сколько они ожидают? исходный файл для подсчёта у них, как я понял, всегда разный (что странно) и скачать его нельзя.
у меня локально всё считает верно.
Deosis
05.11.2024 22:23При последнем чтении буфер может заполниться не до конца, и часть байтов с прошлой итерации будут обработаны ещё раз.
Zara6502
05.11.2024 22:23ну я балда конечно
int count = 0; using (var stdin = Console.OpenStandardInput()) { byte[] buffer = new byte[32768]; int bytes = 0; while ((bytes = stdin.Read(buffer, 0, buffer.Length)) > 0) { for(int c = 0; c < bytes; c++) if (buffer[c] == 127) count++; } } Console.WriteLine(count);
всё получилось!
Кстати пообщался с разработчиками в телеге, весьма дружелюбно и позитивно, интересный сайт.
Кстати по решению задачи:
Best score: 13 260 (на C++)
Моё очевидное из текста выше: 204 614 (на C#)
Базовый пример на C++: 8 922 064
То есть добавление буфера для чтения ускорило всё в 43.6 раза.
voldemar_d
05.11.2024 22:23В ней используется только AVX2, а AVX512 не используется.
А если использовать AVX512, то ещё в разы быстрее станет?
sergio_nsk
В итоге x500 прирост за счёт того, что
std::cin
заменил наmmap
и x1,1 за счёт остального. Причём и первая замена несправедлива: в файле текст 122 127 89 ..., а в решении двоичный файл с байтами. Для справедливости переводчик мог бы показать прирост скорости только от заменыstd::cin
с текстовым файлом наmmap
с двоичным файломsena
оптимизированное решение работает со стандартным вводом
mOlind
Вы можете посмотреть условие и проверить свое решение на https://highload.fun/tasks/5 . там бинарные данные идут на вход. Замена std::cin на mmap дает x250 ускорение. А дальше волшебство и танцы с бубном, чтобы приблизиться к лидеру.