image

В Go есть приятная утилита для синхронизации, именуемая WaitGroup, которую могут ожидать одна или несколько горутин. Это требуется для конкурентного завершения задач. В других языках обычно действует следующее соглашение по завершению задач: нужно объединять потоки, выполняющие работу. В Go горутины не имеют ни значений, ни дескрипторов, поэтому WaitGroup применяется вместо объединений. Собирание WaitGroup на основе типичных портируемых примитивов – путаное дело, в которое вовлечены конструкторы и деструкторы; также в процессе этой работы приходится управлять временами жизни. Однако, как минимум, под Linux и под Windows, можно построить WaitGroup из целого числа (инициализируемого в значении), во многом как при создании 32-разрядной очереди и 32-разрядного барьера.

В случае, если вы не знакомы с типичными случаями использования WaitGroup в Go – они таковы:

var wg sync.WaitGroup
for _, task := range tasks {
    wg.Add(1)
    go func(t Task) {
        // ... выполняем задачу ...
        wg.Done()
    }(task)
}
wg.Wait()

Я инициализирую WaitGroup с нулевым значением, и главная горутина увеличивает счётчик на единицу перед началом каждой очередной горутины, решающей задачу. Как только такая горутина справится со своей задачей, она уменьшит счётчик на единицу, а главная горутина при этом будет дожидаться, пока её счётчик не достигнет нуля. Я стремился реализовать тот же самый механизм на C:

void workfunc(task t, int *wg)
{
    // ... выполняем задачу ...
    waitgroup_done(wg);
}


int main(void)
{
    // ...
    int wg = 0;
    for (int i = 0; i < ntasks; i++) {
        waitgroup_add(&wg, 1);
        go(workfunc, tasks[i], &wg);
    }
    waitgroup_wait(&wg);
    // ...
}

Справившись с задачей, WaitGroup возвращается к нулю, и никакая очистка при этом не требуется.

Далее я собираюсь немного развить этот пример. Поскольку значение и контекст WaitGroup указаны явно, можно инициализировать WaitGroup с любым неотрицательным количеством задач! Иными словами, waitgroup_add опционально, если общее количество задач известно заранее.

    int wg = ntasks;
    for (int i = 0; i < ntasks; i++) {
        go(workfunc, tasks[i], &wg);
    }
    waitgroup_wait(&wg);

Если хотите заглянуть в полный код — смотрите: waitgroup.c

Четыре элемента (синхронизации)


Чтобы построить такую WaitGroup, нам понадобятся четыре примитива на хост-платформе, и каждый из этих примитивов будет оперировать int. Первые два представляют атомарные операции, а вторые два взаимодействуют с системным планировщиком. Чтобы портировать WaitGroup на платформу, необходимо всего лишь реализовать четыре этих функции; как правило, каждая из них укладывается в одну строку.

static int load(int *); // атомарная загрузка
static int addfetch(int *, int); // атомарное сложение с последующей выборкой
static void wait(int *, int); // дожидаться изменения по адресу
static void wake(int *); // разбудить всех ожидающих по адресу

Первые две функции, должно быть, самоочевидны. Функция wait ожидает указателя, направленного на целое число, чтобы изменить его значение, а второй её аргумент – это ожидаемое актуальное значение. Планировщик дважды проверит целое число, прежде, чем отправлять поток спать, на случай, если в последний момент значение изменится. Иными словами, имеем дело с атомарной операцией «проверить-а-потом-возможно-заснуть». Функция wake отвечает за вторую половину работы. Поток, после того, как изменит целое число, при помощи этой функции пробуждает все остальные потоки, ожидавшие изменения того числа, на которое был направлен указатель. В целом весь этот механизм известен под названием фьютекс.

Я собираюсь немного упростить семантику WaitGroup, чтобы моя реализация стала ещё легче. WaitGroup в Go допускает сложение отрицательных чисел, и метод Add, в сущности, выполняет двойную работу. В моей версии сложение отрицательных чисел запрещено. Таким образом, операция “add” – это просто атомарное увеличение на единицу:

void waitgroup_add(int *wg, int delta)
{
    addfetch(wg, delta);
}

Поскольку счётчик здесь невозможно довести до нуля, делать больше ничего не требуется. Но операция “done” может уменьшить счётчик до нуля:

void waitgroup_done(int *wg)
{
    if (!addfetch(wg, -1)) {
        wake(wg);
    }
}

Если при атомарном уменьшении счёт был доведён до нуля, это означает, что мы завершили работу над последней задачей – то есть, необходимо разбудить ожидающих. Мы не знаем, есть ли в самом деле такие ожидающие, но это нормально. В некоторых вариантах применения фьютексов удаётся обойтись без относительно дорогостоящего системного вызова, если ожидающих нет – то есть, не тратить время на системный вызов для каждой разблокировки мьютекса, за который не конкурируют. Но в типичном варианте использования WaitGroup мы рассчитываем, что ждущий найдётся, когда счёт, наконец, дойдёт до нуля. Это распространённый случай.

Самая сложная из трёх операций – ожидание:

void waitgroup_wait(int *wg)
{
    for (;;) {
        int c = load(wg);
        if (!c) {
            break;
        }
    wait(wg, c);
    }
}

Сначала проверяем, не равен ли уже счёт нулю, и если равен – возвращаемся. В противном случае используем фьютекс, чтобы дождаться, пока счёт изменится. К сожалению, мы стремимся получить не совсем такую семантику, а возможность ожидать определённого целевого значения. Ожидания это не нарушает, но потенциально может оказаться неэффективным. Если поток справится с задачей между нашими операциями загрузки и ожидания, мы не отправим его спать, а поручим ему вторую попытку. Но на практике я конкурентно прогонял через эту штуку тысячи потоков и так и не встретил подобного «промаха». Насколько я могу судить, такой случай настолько редок, что его можно не учитывать.

Если бы такой момент действительно нас беспокоил, то WaitGroup можно было бы сделать из пары целых чисел: счётчика и защёлки, которая может быть равна 0 или 1. Ожидающие ждут защёлку, а сама защёлка (атомарно) модифицируется, когда счётчик доходит до нуля или сдвигается с нуля. Таким образом, ожидающие получают стабильное значение, которое можно ждать, и защёлка опосредует счётчик. Но, поскольку на практике это, видимо, значения не имеет, я предпочту элегантность и простоту WaitGroup, состоящей из единственного целого числа.

Четыре элемента: Linux


Когда в общем виде WaitGroup сделана, нам нужно предусмотреть те её части, которые зависят от конкретных платформ. Как GCC, так и Clang поддерживают атомики в стиле GNU, так что я не буду задумываться о компиляторе, а просто предположу, что они доступны под Linux. Первые две функции обёртывают эти встроенные элементы:

static int load(int *p)
{
    return __atomic_load_n(p, __ATOMIC_SEQ_CST);
}


static int addfetch(int *p, int addend)
{
    return __atomic_add_fetch(p, addend, __ATOMIC_SEQ_CST);
}

Для wait и wake нам понадобится системный вызов futex(2). В попытке показать нежелательность их прямого использования, glibc не обёртывает этот системный вызов в функцию, поэтому мы должны выполнить системный вызов сами.

static void wait(int *p, int current)
{
    syscall(SYS_futex, p, FUTEX_WAIT, current, 0, 0, 0);
}


static void wake(int *p)
{
    syscall(SYS_futex, p, FUTEX_WAKE, INT_MAX, 0, 0, 0);
}

INT_MAX означает «разбудить стольких, скольких получится». Другое распространённое значение – 1, соответствует «разбудить единственного ждущего». Кроме того, эти системные вызовы не могут информативно отказать, так что и возвращаемое значение проверять не требуется. Если wait проснётся рано (напр., EINTR), то он всё равно перепроверит счётчик. Фактически, если ваше ядро старше 20 лет, и оно появилось задолго до фьютексов, то оно всё равно вернёт ENOSYS («функция не реализована»). В таком случае функция всё равно будет работать корректно, пусть и крайне неэффективно.

Четыре элемента: Windows


Windows не поддерживала фьютексов вплоть до версии Windows 8, вышедшей в 2012 году. В 2020 году по-прежнему поддерживались версии Windows без фьютексов, поэтому на данной платформе фьютексы до сих пор можно считать относительной новинкой. Тем не менее, они уже достаточно зрелые, чтобы мы могли считать их готовыми к использованию.

Мне хотелось бы, чтобы поддерживались как GCC-шные компиляторы (через Mingw-w64), так и MSVC-шные. Mingw-w64 предоставляет совместимый intrin.h, так что я могу продолжать работать с MSVC-шными атомиками и покрыть оба варианта сразу. С другой стороны, MSVC не требует, чтобы атомики для int (или даже int32_t) были строго long, поэтому я могу подмахнуть здесь небольшое приведение. (напомню: sizeof(long) == sizeof(int) во всех версиях Windows, поддерживающих фьютексы.) Другой вариант – применить с WaitGroup определение typedef, так, чтобы получился int под Linux (для фьютексов) и long под Windows (для атомиков).

static int load(int *p)
{
    return _InterlockedOr((long *)p, 0);
}


static int addfetch(int *p, int addend)
{
    return addend + _InterlockedExchangeAdd((long *)p, addend);
}

Официально разрешённые к использованию фьютексные функции — WaitOnAddress и WakeByAddressAll. Они уже были в kernel32.dll, но на момент написания оригинала этой статьи они живут в API-MS-Win-Core-Synch-l1-2-0.dll, связанные через -lsynchronization. Грязно. Поскольку я этого не перевариваю, вместо них я вызываю низкоуровневые RTL-функции прямо по месту реализации: RtlWaitOnAddress и RtlWakeAddressAll. Они живут прямо по соседству с ntdll.dll, что очень удобно. Насколько мне известно, они не документированы, но, к счастью, нам на помощь приходит Wine, где предоставляется не только документация по ним, но и несколько вариантов реализации. Почитать этот материал познавательно; он подсказывает, как сконструировать фьютексы в тех системах, где они пока отсутствуют.

Эти функции не объявляются ни в каких заголовках, поэтому о них мне пришлось позаботиться самостоятельно. Положительный момент здесь заключается в том, что пока мне удавалось обходиться без существенных издержек на включение windows.h, так что я и дальше могу без него обходиться. Эти функции всё-таки перечислены в библиотеке импорта ntdll.dll, поэтому мне не приходится самому изобретать записи такой библиотеки.

__declspec(dllimport)
long __stdcall RtlWaitOnAddress(void *, void *, size_t, void *);
__declspec(dllimport)
long __stdcall RtlWakeAddressAll(void *);


Достаточно удобно, что эта семантика идеально соответствует семантике фьютексов в Linux!

static void wait(int *p, int current)
{
    RtlWaitOnAddress(p, &current, sizeof(*p), 0);
}


static void wake(int *p)
{
    RtlWakeAddressAll(p);
}

Как и в случае с Linux, информативного отказа тут не добиться, так что возвращаемые значения нас не интересуют.

Вот и вся реализация. Если рассчитывать всего на одну платформу – получается гибкое, легковесное и удобное в использовании средство синхронизации. Как по мне, оно весьма впечатляет, учитывая, что его удалось реализовать примерно в 50 строках относительно простого кода!

Комментарии (0)