Всем добрый день.


Недавно на Хабре появилась статья Побеждая C двадцатью строками Haskell: пишем свой wc от @0xd34df00d. Автор, известный своей симпатией к функциональному программированию, реализовал на Хаскеле аналог утилиты wc, и подверг его оптимизации, получив в результате вариант, работающий более чем в 7 раз быстрее стандартной юниксовой утилиты.


Будучи добросовестным "бенчмаркером", 0xd34df00d в конце статьи указал, что его вариант работает с сильно упрощенным ТЗ по сравнению с оригиналом (например, был проигнорирован Unicode), и следовательно его превосходство в производительности ничего по факту не означает. Но ясное дело, что несмотря на это формат статьи породил множество споров в комментариях (да я и сам часто не читаю дальше заголовка, чего греха таить). Под статьей yleo привел результаты тестирования написанного им тривиального решения на C (под упрощенное условие), показав, что даже наивная C-реализация обходит оптимизированный Haskell-вариант. После этого господа принялись кидаться друг в друга какомментариями, в которых придирались к мелким различиям возможностей их вариантов (это исключительно мое личное видение ситуации).


Спустя некоторое время вышла статья товарища chapuza, в которой была представлена реализация упрощенного wc на Elixir, также работающая быстрее, чем стандартная. В комментариях продолжилась дискуссия из первого поста, хотя на мой взгляд былой запал был утрачен.


В данной статье мы не будем пытаться победить один язык/парадигму программирования другим/другой. Думаю, непредвзятому человеку понятно, что C намного гибче в плане ручной оптимизации, чем Haskell и большинство других языков программирования, и поэтому код на C хоть и может быть медленнее чем на Haskell, но также может быть и намного быстрее — все зависит от того, кто этот код пишет. С другой стороны, лично я вижу много пока что не раскрытых возможностей функциональной парадигмы, и не исключаю возможности, что когда-то именно функциональный код будет оптимизироваться настолько круто, что написание подобного аналога на C будет слишком затратным занятием. Но даже если и так, это время еще не пришло.


В то же время, будучи человеком, чью любовь к оптимизации (даже бессмысленной) я оцениваю как фанатическую, я сразу заинтересовался насколько можно оптимизировать тривиальный C-вариант. Этим мы и займемся. Думаю, на волне предыдущих статей это будет интересно, к тому же, сам 0xd34df00d в комментариях к своей статье выразил интерес к подсчету слов с помощью "simd-магии".


Тестовый стенд


Итак, для начала представим условия, в которых мы будем считать слова.


#include <stdint.h>
#include <stdio.h>
#include <malloc.h>
#include <Windows.h>
#include <intrin.h>

using namespace std;

struct stats
{
    uint32_t char_count;
    uint32_t line_count;
    uint32_t word_count;
};

void word_count( unsigned char* const str, uint32_t size, stats& result );

int main( )
{
    FILE* f;
    fopen_s( &f, "input.txt", "rb" );
    fseek( f, 0, SEEK_END );
    long size = ftell( f );
    rewind( f );

    long alloc_size = size + 63 - ( size + 63 ) % 64;
    unsigned char* const data = (unsigned char*)_aligned_malloc( alloc_size, 64 );
    fread( data, 1, size, f );
    fclose( f );

    LARGE_INTEGER fr, t0, t1;
    QueryPerformanceFrequency( &fr );
    QueryPerformanceCounter( &t0 );

    stats result;
    word_count2( data, size, result );

    QueryPerformanceCounter( &t1 );

    _aligned_free( data );

    double const time = (double)( t1.QuadPart - t0.QuadPart ) / (double)fr.QuadPart;
    printf(
        "Words: %d; lines: %d; chars: %d\nElapsed time: %lfms\n",
        result.word_count, result.line_count, result.char_count, time * 1000.0
    );
    system( "pause" );
}

Для начала — да, я буду использовать Windows и Visual Studio. К сожалению, мой уровень владения Linux'ом приближается к нулю, и писать Linux-программы значило бы потратить значительно больше усилий. С другой стороны, мы ведь ни с кем не соревнуемся — так что пусть будет уютная VS.


Как видите, все максимально просто. Мы считываем файл с диска, предварительно узнав его размер и выделив под него память. Память выделяем выровненную по 64 байта и также дополняем размер аллокации до 64 байт — пригодится для SIMD-инструкций, да и вообще не повредит. Далее вызываем функцию подсчета слов, время выполнения которой трекаем с помощью таймеров высокой точности. (Стоило бы использовать средства стандартной библиотеки, но этот метод мне привычней, к тому же, не приходится сомневаться в его точности.) Далее просто выводим результат обработки и затраченное время в миллисекундах, не забывая освободить память.


Будем считать, что во входном файле не используется Unicode, и что пробельными являются символы код которых меньше либо равен 32. Последнее ограничение можно заменить на более корректное, но тогда результаты возможно будут другими. К тому же, мне кажется что подсчет слов обычно используется на данных, где такое условие будет работать удовлетворительно.


Тестирование проводилось на компьютере с новеньким Ryzen 5 3600 c оперативной памятью 2x8 Gb 3200 MHz. Ах да, входной файл использовался тот же, что и в предыдущих статьях.


Тривиальная реализация


Мой тривиальный вариант следующий:


void word_count( unsigned char* const str, uint32_t size, stats& result )
{
    uint32_t lines = 1;
    uint32_t words = 0;

    bool was_space = false;

    for ( uint32_t i = 0; i < size; ++i )
    {
        unsigned char const c = str[i];
        bool const is_space = c <= ' ';
        lines += ( c == '\n' ) ? 1 : 0;
        words += ( is_space && !was_space ) ? 1 : 0;
        was_space = is_space;
    }

    words += !was_space ? 1 : 0;

    result.char_count = size;
    result.line_count = lines;
    result.word_count = words;
}

По сути он практически ничем не отличается от уже приведенного в комментариях к статье 0xd34df00d. Результаты работы:


Words: 44774640; lines: 15000010; chars: 1871822228
Elapsed time: 1825.704800ms

(SIMD-магия выходит на сцену)


Итак, что мы видим? Видим то, что каждый символ строки по сути обрабатывается независимо (да, есть зависимость от предыдущего символа, но не от результата его обработки). Так почему бы нам не обрабатывать по 16 символов одновременно с помощью SSE-инструкций?


Вариант с SSE:


void word_count_sse( unsigned char* const str, uint32_t size, stats& result )
{
    uint32_t lines = 1;
    uint32_t words = 0;

    bool was_space = false;

    for ( uint32_t i = 0; i < size; i += 16 )
    {
        __m128i const c = _mm_load_si128( (__m128i*)( str + i ) );
        uint32_t is_newline_mask = _mm_movemask_epi8(
            _mm_cmpeq_epi8( c, _mm_set1_epi8( '\n' ) )
        );

        uint32_t is_space_mask = _mm_movemask_epi8(
            _mm_andnot_si128( c, _mm_cmplt_epi8( c, _mm_set1_epi8( ' ' + 1 ) ) )
        );
        uint32_t was_space_mask = ( was_space ? 1 : 0 ) | ( is_space_mask << 1 );

        lines += __popcnt( is_newline_mask );
        words += __popcnt( is_space_mask & ~was_space_mask );
        was_space = is_space_mask & 0x8000;
    }

    words += !was_space ? 1 : 0;

    result.char_count = size;
    result.line_count = lines;
    result.word_count = words;
}

Думаю стоит объяснить что тут происходит. У нас все еще есть флаг was_space, который означает был ли предыдущий символ пробельным. Теперь, поскольку мы обрабатываем 16 символов за раз, этот флаг сохраняет информацию о 15-ом символе из предыдущей партии, так как именно это нам важно для первого символа из текущей.


В цикле мы сначала загружаем данные с помощью интринсика _mm_load_si128 в переменную типа __m128i (в ассемблере она станет 128-битным XMM-регистром).


Далее мы проводим тесты символов на то что они являются пробельными или символами перевода строки. Наша цель — получить 16-битные маски результата, в котором каждый бит отвечает за соответствующий символ в 128-битном регистре. SSE-инструкции сравнения работают только с XMM-регистрами, и для того что бы получить 16-битную маску мы используем интринсик _mm_movemask_epi8. Эта инструкция делает именно то что нам нужно — принимает на вход 128-битную переменную, интерпретирует ее как массив из 16 байт, и выдает 16-битную маску, каждый бит которой копируется из старшего бита соответствующего байта XMM-регистра.


Для теста на перевод строки мы просто используем инструкцию _mm_cmpeq_epi8, которая побайтно сравнивает входные параметры и выдает 128-битный результат, каждый байт которого имеет значение 0, если соответствующие входные байты неравны, и 255 в противном случае. На вход мы подаем переменную с обрабатываемыми символами и 128-битную переменную, каждый байт представляет символ перевода строки (для этого используем интринсик _mm_set1_epi8).


С тестом на пробельный символ все немного сложнее. Дело в том, что тест больше/меньше SSE может проводить только над знаковыми целыми числами. Это и делает инструкция _mm_cmplt_epi8 — проверяет, что байты первого аргумента строго меньше байтов второго и возвращает маску аналогично _mm_cmpeq_epi8. Но это значит, что любой символ, код которого равен 128 и выше, такая проверка назовет пробельным. Поэтому мы маскируем подобные символы с помощью инструкции _mm_andnot_si128. Она принимает на вход два 128-битных регистра и возвращает результат побитового "И" второго аргумента и инвертированного первого. Мы передаем текущие символы как первый аргумент и результат _mm_cmplt_epi8 как второй. У символов с кодом больше 128 установлен старший бит, следовательно в результате соответствующие старшие биты будут обнулены — это как раз то, что нам нужно для movemask'а.


Дальше мы уже оперируем с битовыми масками в обычных регистрах. Вычисляем маску was_space_mask, используя is_space_mask и флаг was_space, комбинируем маски так же, как в элементарном варианте и используем __popcnt что бы узнать количество установленных бит, которое аккумулируем в результат. В конце обновляем was_space флаг, устанавливая его в старший бит маски.


Поподробнее узнать об SIMD-инструкциях платформы x86 лучше здесь.


Результаты тестирования:


Words: 44774640; lines: 15000010; chars: 1871822228
Elapsed time: 165.704300ms

В 11 раз быстрее! Неплохой результат.


Еще SIMD-магия


Но, как говорится в известной пословице, между SSE и AVX перерывчик небольшой. Давайте попробуем обрабатывать 32 символа за раз с помощью инструкций AVX/AVX2:


void word_count_avx( unsigned char* const str, uint32_t size, stats& result )
{
    uint32_t lines = 1;
    uint32_t words = 0;

    bool was_space = false;

    for ( uint32_t i = 0; i < size; i += 32 )
    {
        _mm_prefetch( (char*)str + i + 32 * 64, _MM_HINT_NTA );

        __m256i const c = _mm256_load_si256( (__m256i*)( str + i ) );
        uint32_t is_newline_mask = _mm256_movemask_epi8(
            _mm256_cmpeq_epi8( c, _mm256_set1_epi8( '\n' ) )
        );

        uint32_t is_space_mask = _mm256_movemask_epi8(
            _mm256_andnot_si256( c, _mm256_sub_epi8( c, _mm256_set1_epi8( ' ' + 1 ) ) )
        );
        uint32_t was_space_mask = ( was_space ? 1 : 0 ) | ( is_space_mask << 1 );

        lines += __popcnt( is_newline_mask );
        words += __popcnt( is_space_mask & ~was_space_mask );
        was_space = is_space_mask & 0x80000000;
    }

    words += !was_space ? 1 : 0;

    result.char_count = size;
    result.line_count = lines;
    result.word_count = words;
}

Как видно, по сравнению с SSE-вариантом смысловых изменений нет. Использованы инструкции AVX/AVX2, функционал которых аналогичен тем, что были использованы ранее. Инкремент в цикле теперь не 16, а 32, и маски теперь 32-битные.


Результат:


Words: 44774640; lines: 15000010; chars: 1871822228
Elapsed time: 109.606000ms

Примерно в полтора раза быстрее.


Prefetch


В подобных ситуациях, когда мы обрабатываем большие объемы памяти, производя не так уж много вычислений, и при этом доступ к памяти последовательный, стоит задуматься о том, что бы использовать интринсик _mm_prefetch. Данная инструкция указывает, что мы хотим в будущем использовать некоторый участок памяти, и что неплохо бы загрузить его в кеш (то, что происходит на самом деле, сильно зависит от модели процессора). Так что сделал небольшую вставку в AVX-вариант:


void word_count2( unsigned char* const str, uint32_t size, stats& result )
{
    // ..............

    for ( uint32_t i = 0; i < size; i += 32 )
    {
        _mm_prefetch( (char*)str + i + 32 * 64, _MM_HINT_NTA );

        // ..............
    }

    // ..............
}

Words: 44774640; lines: 15000010; chars: 1871822228
Elapsed time: 73.564900ms

То есть на каждой итерации цикла мы просим загрузить в кеш участок памяти, который будем обрабатывать через 64 итерации. Немного нечестная оптимизация, поскольку параметры я подстроил исключительно под себя; на других платформах на самом деле это может даже снизить производительность.


В этом месте я подумал, что мы уже наверняка почти что упираемся в пропускную способность RAM. И правда — если оставить в цикле только загрузку данных в регистр, цикл отрабатывает примерно за 70ms. Так что смысла оптимизировать дальше скорее всего нет (да и раньше его не было).


Выводы


Итак, в этой статье было показано, как можно действительно быстро считать слова в тексте. Другой вопрос, стоит ли это делать быстро, или лучше все таки делать это правильно на любых входных данных, при том что изначально 1.8 Gb текста обрабатывались за 10 секунд.


В любом случае, надеюсь, статья была интересной, и кто-то сможет почерпнуть для себя что-нибудь новое.


Веселых холиваров!