Оказалось, что тема суммирования целых чисел в кодировке ASCII в Haswell со скоростью memcpy гораздо популярнее, чем я мог ожидать. Именно поэтому я решил поучаствовать и в другом челлендже в жанре HighLoad: подсчёт uint8. В настоящее время я занимаю всего лишь 13 место в списке лидеров, проигрываю первому месту около 7%, но уже узнал немало интересного. В этом посте я полностью опишу моё решение, в том числе, удивительный паттерн считывания из памяти. Используя его, можно примерно до 30% (по сравнению с обычным последовательным доступом) повысить скорость передачи в контексте одноядерных рабочих нагрузок, ограниченных размером кэша. По-видимому, этот метод малоизвестен.

Как и в других постах автора, программа настроена для следующих входных характеристик высоконагруженной системы: Intel Xeon E3-1271 v3 @ 3,60 ГГц, ОЗУ 512 МБ, Ubuntu 20.04. В ней используется только AVX2, а AVX512 не используется.

Задача

«Выведите на экран, сколько байт соответствует значению 127 в файле размером 250 МБ, который полон байт, равномерно выбранных из диапазона [0, 255] и отправленных в стандартный вывод»

К следующему просто нечего добавить! Решение, которое мы представим, работает в 550 раз быстрее следующей тривиальной программы.

uint64_t count = 0;
for (uint8_t v; std::cin >> v;) {
    if (v == 127) {
        ++count;
    }
}

std::cout << count << std::endl;
return 0;

Ядро

Весь исходный код решения приведён в конце этого поста. Но сначала я пошагово объясню, как он работает. Ядро состоит всего из трёх инструкций, поэтому сразу перехожу к блоку __asm__ (извините!).

; rax — это основание ввода
; rsi — это смещение до актуального фрагмента
vmovntdqa    (%rax, %rsi, 1), %ymm4
; ymm2 — это вектор, заполненный 127
vpcmpeqb     %ymm4, %ymm2, %ymm4
; ymm6 — это аккумулятор, байты которого соответствуют 
; текущему счёту экземпляров 127
; на данной позиции во входном фрагменте 
vpsubb       %ymm4, %ymm6, %ymm6

При помощи этого кода мы перебираем 32-байтные фрагменты ввода и:

  • Загружаем фрагмент с vmovntdqa (это инструкция перемещения, записываемая в память, минуя кэш, вставлена только в стилистических целях и во время выполнения роли не играет).

  • Каждый байт во фрагменте сравниваем со 127 при помощи vpcmpeqb, что даёт нам 0xFF (оно же -1), и этот байт соответствует 127, а все остальные — 0x00. Например,[125, 126, 127, 128, ...] принимает вид [0, 0, -1, 0, ...].

  • Вычитаем результат сравнения из аккумулятора. Продолжая предыдущий пример и предполагая, что аккумулятор у нас заполнен нулями, получаем [0, 0, 1, 0, ...].

Затем, чтобы не допустить переполнения этого узкого аккумулятора, мы время от времени дампируем его в более широкий при помощи следующего кода:

; ymm1 — это нулевой вектор
; ymm6 — это узкий аккумулятор
vpsadbw      %ymm1,%ymm6,%ymm6
; ymm3 — это широкий аккумулятор
vpaddq       %ymm3,%ymm6,%ymm3

vpsadbw суммирует в аккумуляторе каждые восемь байт, получая из них четыре 64-разрядных числа, после чего vpadddq суммирует результат с более широким аккумулятором, в котором переполнения гарантированно не произойдёт. В конце работы мы извлекаем результат, чтобы получить окончательный счёт.

Пока ничего экстраординарного. На самом деле, именно такой подход описан в следующей дискуссии на StackOverflow: How to count character occurrences using SIMD.

Начинается волшебство

Сложность этой задачи в том, что вычислений в её рамках очень мало, но они очень ограничены по памяти. Я проштудировал мануал по оптимизации от Intel (там полно опечаток) в поисках нужных мне данных о памяти, пока на странице 788 не встретил рассказ о 4 аппаратных префетчерах (механизмах предвыборки инструкций). Создавалось впечатление, как будто три из них полезны только при последовательном доступе (которым я уже занимался), но в одном, который называется «Streamer», нашёлся интересный нюанс:

«Фиксирует и ведёт до 32 потоков операций доступа к данным. Для каждой 4-килобайтной страницы можно вести один прямой и один обратный поток».

«Для каждой 4-килобайтной страницы». Улавливаете суть? Можно не обрабатывать последовательно весь вывод, а перемежать обработку 4-килобайтных страниц, следующих друг за другом. Также мы немного разматываем ядро и обрабатываем в каждом блоке целую кэш-линию (2x32 байт).

#define BLOCK(offset) \
    "vmovntdqa    " #offset " * 4096 (%6, %2, 1), %4\n\t" \
    "vpcmpeqb     %4, %7, %4\n\t" \
    "vmovntdqa    " #offset " * 4096 + 0x20 (%6, %2, 1), %3\n\t" \
    "vpcmpeqb     %3, %7, %3\n\t" \
    "vpsubb       %4, %0, %0\n\t" \
    "vpsubb       %3, %1, %1\n\t" \

8 из них мы помещаем в главный цикл, где offset устанавливается в размере от 0 до 7 включительно.

В таком случае балл на HighLoad увеличивается примерно на 15%, но, если ваше ядро ещё сильнее ограничено по памяти — допустим, вы просто складываете байты при помощи vpaddb, чтобы найти их сумму по модулю 255, на этом можно выиграть до 30%. Впечатляет, учитывая, насколько это простое изменение!

В любом случае, есть ещё одна маленькая деталь: мы добавляем предвыборку четырёх ближайших кэш-линий:

#define BLOCK(offset) \
    "vmovntdqa    " #offset " * 4096 (%6, %2, 1), %4\n\t" \
    "vpcmpeqb     %4, %7, %4\n\t" \
    "vmovntdqa    " #offset " * 4096 + 0x20 (%6, %2, 1), %3\n\t" \
    "vpcmpeqb     %3, %7, %3\n\t" \
    "vpsubb       %4, %0, %0\n\t" \
    "vpsubb       %3, %1, %1\n\t" \
    "prefetcht0   " #offset " * 4096 + 4 * 64 (%6, %2, 1)\n\t"

Почему именно 4 кэш-линии? Не могу внятно ответить на этот вопрос, просто так работает лучше. На следующем графике показано, как исполняется программа при таком решении, причём, в графике заложены шаги предвыборки от 0 до 100 на иной системе (вот почему оптимум здесь сдвинут). Как видите, кривая довольно сложна.

Вверху: минимальное время выполнения в сравнении с шагом предвыборки. Внизу: шаг предвыборки
Вверху: минимальное время выполнения в сравнении с шагом предвыборки. Внизу: шаг предвыборки

Исходный код

#include <iostream>
#include <cstdint>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <immintrin.h>
#include <cassert>

#define BLOCK_COUNT 8
#define PAGE_SIZE 4096
#define TARGET_BYTE 127

#define BLOCKS_8 \
    BLOCK(0)  BLOCK(1)  BLOCK(2)  BLOCK(3) \
    BLOCK(4)  BLOCK(5)  BLOCK(6)  BLOCK(7)

#define BLOCK(offset) \
    "vmovntdqa    " #offset "*4096(%6,%2,1),%4\n\t" \
    "vpcmpeqb     %4,%7,%4\n\t" \
    "vmovntdqa    " #offset "*4096+0x20(%6,%2,1),%3\n\t" \
    "vpcmpeqb     %3,%7,%3\n\t" \
    "vpsubb       %4,%0,%0\n\t" \
    "vpsubb       %3,%1,%1\n\t" \
    "prefetcht0   " #offset "*4096+4*64(%6,%2,1)\n\t"


static inline
__m256i hsum_epu8_epu64(__m256i v) {
    return _mm256_sad_epu8(v, _mm256_setzero_si256());
}

int main() {
    struct stat sb;
    assert(fstat(STDIN_FILENO, &sb) != -1);
    size_t length = sb.st_size;

    char* start = static_cast<char*>(mmap(nullptr, length, PROT_READ, MAP_PRIVATE | MAP_POPULATE, STDIN_FILENO, 0));
    assert(start != MAP_FAILED);

    uint64_t count = 0;
    __m256i sum64 = _mm256_setzero_si256();
    size_t offset = 0;

    __m256i compare_value = _mm256_set1_epi8(TARGET_BYTE);
    __m256i acc1 = _mm256_set1_epi8(0);
    __m256i acc2 = _mm256_set1_epi8(0);
    __m256i temp1, temp2;

    while (offset + BLOCK_COUNT*PAGE_SIZE <= length) {
        int batch = PAGE_SIZE / 64;
        asm volatile(
            ".align 16\n\t"
            "0:\n\t"

            BLOCKS_8

            "add          $0x40, %2\n\t"
            "dec          %5\n\t"
            "jg           0b"
            : "+x" (acc1), "+x" (acc2), "+r" (offset), "+x" (temp1), "+x" (temp2), "+r" (batch)
            : "r" (start), "x" (compare_value)
            : "cc", "memory"
        );

        offset += (BLOCK_COUNT - 1)*PAGE_SIZE;

        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc1));
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc2));

        acc1 = _mm256_set1_epi8(0);
        acc2 = _mm256_set1_epi8(0);
    }

    sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc1));
    sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc2));

    count += _mm256_extract_epi64(sum64, 0);
    count += _mm256_extract_epi64(sum64, 1);
    count += _mm256_extract_epi64(sum64, 2);
    count += _mm256_extract_epi64(sum64, 3);

    for (; offset < length; ++offset) {
        if (start[offset] == TARGET_BYTE) {
            ++count;
        }
    }

    std::cout << count << std::endl;
    return 0;
}

Заключение

Удивительно, насколько обойдён вниманием паттерн с использованием перемежающихся страниц. Насколько помню, никогда не встречал его в реальной практике. Любопытно! Если вам доводилось с ним сталкиваться, расскажите об этом. А если я забыл ещё о каких-то вариантах оптимизации памяти, сообщите о них тоже.

Комментарии (10)


  1. sergio_nsk
    05.11.2024 22:23

    В итоге x500 прирост за счёт того, что std::cin заменил на mmap и x1,1 за счёт остального. Причём и первая замена несправедлива: в файле текст 122 127 89 ..., а в решении двоичный файл с байтами. Для справедливости переводчик мог бы показать прирост скорости только от замены std::cin с текстовым файлом на mmap с двоичным файлом


    1. sena
      05.11.2024 22:23

      оптимизированное решение работает со стандартным вводом


    1. mOlind
      05.11.2024 22:23

      Вы можете посмотреть условие и проверить свое решение на https://highload.fun/tasks/5 . там бинарные данные идут на вход. Замена std::cin на mmap дает x250 ускорение. А дальше волшебство и танцы с бубном, чтобы приблизиться к лидеру.


  1. NickDoom
    05.11.2024 22:23

    Мдас… чтобы ассемблер уверенно обходил все эти «умные компиляции», в наше время надо прямо уже конкретно жестить :)

    С другой стороны — такого внимания требует 1% кода (но рискующий сожрать 99% времени выполнения). Тут можно и расщедриться. В этом плане ничего не изменилось :)


  1. sena
    05.11.2024 22:23

    Решение, которое мы представим, работает в 550 раз быстрее следующей тривиальной программы.

    Не, я конечно понимаю, что надо привлечь внимание, но это уже перебор. Если хоть как-то интересует производительность, это самый медленный способ читать из файла.

    Если использовать обыкновенный read, то наивная реализация работает всего лишь в 5 раз медленнее оптимизированной. И это, конечно, тоже очень неплохо, но сенсации уже не получится.

    #include <unistd.h>
    #include <iostream>
    
    int main(int argc, char **argv)
    {
    
    uint64_t count = 0;
    size_t bufsize = 1024*4*256;
    uint8_t buf[bufsize];
    while(true) {
      ssize_t nread = read(0, buf, bufsize);
      if(nread <= 0) break;
      for(unsigned ii = 0; ii < nread; ++ii) {
        if (buf[ii] == 127) {
            ++count;
        }
      }
    }
    std::cout << count << std::endl;
    return 0;
    }
    $ g++   -mavx2 -O3 simddumb.cc -o simddumb
    $ time ./simddumb < testf.bin 
    53616490
    
    real	0m3,660s
    user	0m2,474s
    sys	0m1,185s
    $ time ./simd < testf.bin 
    53616490
    
    real	0m2,150s
    user	0m0,500s
    sys	0m1,650s

    Размер testf.bin 13GB

    mmap это конечно уже не совсем наивная реализация, но надо для начала испытать с mmap и прочими оптимизациями, перед нырянием в ассемблер


  1. Tsimur_S
    05.11.2024 22:23

    Похоже на челендж highload fizzbuzz с кодгольфа.


  1. Zara6502
    05.11.2024 22:23

    Зарегистрировался на сервисе, написал "свой" вариант на C#

    int count = 0;
    using (var stdin = Console.OpenStandardInput())
    {
      byte[] buffer = new byte[32768];
      while (stdin.Read(buffer, 0, buffer.Length) > 0)
      {
         for(int c = 0; c < buffer.Length; c++)
             if (buffer[c] == 127) count++;
      }
    }
    Console.WriteLine(count);

    и получил ошибку

    Немного поизменял код, позапускал еще три раза и всегда ожидаемое и полученное значения различаются то на 4 то на 65 то на 74 байта.

    и как узнать почему насчиталось не столько сколько они ожидают? исходный файл для подсчёта у них, как я понял, всегда разный (что странно) и скачать его нельзя.

    у меня локально всё считает верно.


    1. Deosis
      05.11.2024 22:23

      При последнем чтении буфер может заполниться не до конца, и часть байтов с прошлой итерации будут обработаны ещё раз.


      1. Zara6502
        05.11.2024 22:23

        ну я балда конечно

        int count = 0;
        using (var stdin = Console.OpenStandardInput())
        {
          byte[] buffer = new byte[32768];
          int bytes = 0;
          while ((bytes = stdin.Read(buffer, 0, buffer.Length)) > 0)
          {
             for(int c = 0; c < bytes; c++)
                 if (buffer[c] == 127) count++;
          }
        }
        Console.WriteLine(count);

        всё получилось!

        Кстати пообщался с разработчиками в телеге, весьма дружелюбно и позитивно, интересный сайт.

        Кстати по решению задачи:

        Best score: 13 260 (на C++)

        Моё очевидное из текста выше: 204 614 (на C#)

        Базовый пример на C++: 8 922 064

        То есть добавление буфера для чтения ускорило всё в 43.6 раза.


  1. voldemar_d
    05.11.2024 22:23

    В ней используется только AVX2, а AVX512 не используется.

    А если использовать AVX512, то ещё в разы быстрее станет?