Как обычно пишут сервер, если не особо заботиться производительности? Программа запускается, затем начинает принимать входящие соединения от клиентов и для каждого клиента запускает новый поток, который занимается обслуживанием этого клиента. Если вы используете какой-нибудь, прости господи, Spring или Flask или там Poco, то он что-такое внутри себя и делает - разве что потоки можно переиспользовать, то есть брать из некого пула. Это всё довольно удобно, но не слишком эффективно. Скорее всего, ваши потоки, обслуживающие клиентов, живут недолго и большую часть времени ожидают либо получения данных от клиента, либо отправки их клиенту - то есть ждут возвращения системных вызовов. Создание потока ОС - довольно дорогая операция, как и переключение контекста между потоками ОС. Если вы хотите уметь обслуживать много клиентов эффективно, надо придумать что-то другое. Например, коллбеки. Но это довольно неудобно.

Другой вариант - использование неблокирующего ввода-вывода в комбинации с какой-то реализацией потоков пользовательского пространства (user-space threads или fibers). В этой статье я покажу, как написать всё это своими руками.

Весь код приведён в репозитории. Там есть 3 ветки: good-old-one-thread-impl содержит изначальную реализацию, hand-context представляет из себя версию с ручным переключением контекста и реализацией fiber-local переменных, остальные две ветки содержат попытку сделать так, чтобы эта реализация работала в нескольких потоках ОС. Весь код приведён только как proof-on-concept и содержит ошибки.

Вводные понятия

Что такое user-space потоки? Это потоки, в переключении которых ОС не принимает участия и про которые она ничего не знает. Они все могут работать в одном потоке ОС или же в нескольких (как горутины в Go, virtual threads в Java 19). Эти потоки реализуют идею кооперативной многозадачности: поток может быть переключён только если он сам попросит об этом. В нашем случае поток будет сниматься с выполнения, когда он ожидает некого ввода или вывода - пока он этого ожидает, будут выполняться другие потоки.

Каков интерфейс неблокирующего ввода-вывода в Linux? Обычно для ввода и вывода используются системные вызовы read и write. Есть и другие, но мы будет рассматривать именно эти, так как они наиболее общие: с помощью них можно работать с файлами, сетью, пайпами, так можно читать сигналы (см signalfd). Обыкновенно такие системные вызовы блокируются, покуда данные не будут прочитаны или записаны, но для файлового дескриптора можно настроить, чтобы он был неблокирующим - тогда системные вызовы read и write не будут блокироваться - а если данные прочитать или записать нельзя, возвращается ошибка EAGAIN. Осталось как-то научиться понимать, какие файловые дескрипторы готовы к вводу-выводу, чтобы не пробовать их вслепую.

Таких механизмов в Linux несколько: select(2), poll(2), epoll(7). Каждый из них предоставляет возможность сообщить ядру ОС, какие файловые дескрипторы нас интересуют, и ядро будет сообщать нам, какие из них готовы к вводу-выводу. select(2) устарел, и использовать я буду poll(2).

Реализация потоков

Поначалу для реализации потоков будем использовать boost::context, но потом мы от него избавимся. Библиотека boost::context позволяет нам переключать контексты. Нам же остаётся написать некоторую обёртку над контекстом и написать какой-то простой планировщик потоков.

Контекст выполнения (представляемый boost::context) представляет из себя состояние некого пока выполнения, то есть значения регистров и стек.

Реализация потока будет представлять из себя какой-то такой класс, хранящий собственно контекст, функцию, которую выполняет поток, информацию о том, было ли выполнение начато, было ли закончено, готов ли поток к выполнению.

класс FiberImpl
class FiberImpl: public std::enable_shared_from_this<FiberImpl> {
// внутри всё приватное, потому что я для удобства напишу бёртку над 
// этой реализацией
private:
    explicit FiberImpl(const std::function<void()>& func);
    // тут ещё нужен, конечно, конструктор, принимающий function&&
    void join();
    bool isFinished() const;
    void start();
    bool isReady() const;
    friend class Fiber;
    friend class FiberManager;
    friend class CondVar;
    friend void sched_execution();
    void continue_executing();
    void suspend();

    std::function<void()> func;
    // контекст, который описывает состояние потока перед завершением
    continuation this_context;
    // контекст, выполнфвшийся перед выполнением потока
    continuation previous_context;
    // condition variable, на котором можно ждать завершения потока
    CondVar finish_cv;
    bool launched = false;
    bool finished = false;
    bool is_ready = false;
};

Наибольший интерес представляет то, как поток надо запускать. Когда поток запускается, следует вызвать функцию callcc из boost::context, которая принимает лямбда функцию и возвращает контекст, в котором переданная функция выполняется (callcc возвращается тогда, когда лямбда функция захочет переключить контекст). Ламбда функция должна принимать контекст, из которого её вызвали - с его помощью она может переключиться обратно в него.

Когда поток надо поставить на выполнение во второй раз, надо просто выполнить метод resume контекста - он начнёт выполнения данного контекста, а когда тот захочет переключиться обратно, функция вернёт новое состояние этого контекста.

запуск потока
void FiberImpl::continue_executing() {
    if (!launched) {
        launched = true;
        this_context = callcc([&](auto sink) {
            cerr << "starting func in new fiber\n";
            previous_context = std::move(sink);
            func();
            finished = true;
            finish_cv.notify_all();
            return std::move(previous_context);
        });
    } else {
        this_context = this_context.resume();
    }
}

Реализация остальных методов FiberImpl довольно тривиальна.

тривиальные функции
extern FiberManager fiberManager;

extern std::shared_ptr<FiberImpl> current_fiber;

FiberImpl::FiberImpl(const std::function<void()> &func) {
    this->func = func;
}

void FiberImpl::join() {
    while (!finished) {
        finish_cv.wait();
    }
}

void FiberImpl::start() {
    fiberManager.registerFiber(this->shared_from_this());
    is_ready = true;
}

void FiberImpl::suspend() {
    previous_context = previous_context.resume();
}

void sched_execution() {
    current_fiber->suspend();
}

void startFiberManager() {
    fiberManager.work();
}

Менеджер потоков должен уметь хранить список потоков, готовых к выполнению, и заниматься запуском этих потоков.

менеджер потоков
using std::shared_ptr;

void startFiberManager();

class FiberManager {
    friend class Fiber;
    friend class FiberImpl;
    friend class CondVar;
    friend void sched_execution();
    void work();
    void registerFiber(const shared_ptr<FiberImpl>& fiber_ptr);

    list<shared_ptr<FiberImpl>> ready_fibers;

    friend void startFiberManager();
};

// обратите внимание, что менеждер хранит только готовые к выплнению 
// потоки. остальные должны храниться ещё где-то
FiberManager fiberManager;

std::shared_ptr<FiberImpl> current_fiber;

void FiberManager::work() {
    while (!ready_fibers.empty()) {
        auto iterator = this->ready_fibers.begin();
        while (iterator != ready_fibers.end()) {
            current_fiber = *iterator;
            if (current_fiber->isReady()) {
                current_fiber->continue_executing();
            }
            if (current_fiber->isFinished()) {
                iterator = ready_fibers.erase(iterator);
            } else {
                iterator++;
            }
        }
    }
}

void FiberManager::registerFiber(const shared_ptr<FiberImpl>& fiber_ptr) {
    ready_fibers.push_back(fiber_ptr);
}

Класс FiberImpl в использовании не очень удобен: надо создавать shared_ptr, функция должна быть типа void(void), а ещё надо вызывать .start() . Напишем простую обёртку над ним.

обёртка
class Fiber {
public:
    template<typename Callable, typename... Args>
    explicit Fiber(const Callable& function, const Args&... args) {
        fiber_ptr = shared_ptr<FiberImpl>(new FiberImpl([&] () {
            function(args...);
        }));
        fiber_ptr->start();
    }

    void join() {
        fiber_ptr->join();
    }

    void detach() {
        fiberManager.work();
    }

    bool isFinished() {
        return fiber_ptr->isFinished();
    }

    bool isReady() {
        return fiber_ptr->isReady();
    }

private:
    shared_ptr<FiberImpl> fiber_ptr;
};

Осталось написать некоторые примитивы синхронизации.

condition variable
class FiberImpl;

class CondVar {
public:
    void wait();
    void notify_one();
    void notify_all();

private:
    std::vector<shared_ptr<FiberImpl>> waiters;
};

Интерфейс тривиален. Внутри храним список потоков, которые ждут на этой condition variable.

extern FiberManager fiberManager;
extern std::shared_ptr<FiberImpl> current_fiber;

void CondVar::wait() {
    waiters.push_back(current_fiber);
    current_fiber->is_ready = false;
    current_fiber->suspend();
}

void CondVar::notify_one() {
    if (!waiters.empty()) {
        auto fiber_ptr = *waiters.rbegin();
        fiber_ptr->is_ready = true;
        waiters.pop_back();
    }
}

void CondVar::notify_all() {
    for (auto& fiber_ptr : waiters) {
        fiber_ptr->is_ready = true;
    }
    waiters.clear();
}

Когда поток хочет ждать, он добавляется в список, помечается как неготовый к выполнению и приостанавливается. Когда кто-то хочет разбудить один или много потоков, они помечаются как готовые к выполнению и удаляются из списка.

Обратите внимание, что пока что у нас все потоки выполняются в одном потоке ОС, а переключаются в контролируемые нами моменты времени, поэтому думать о какой-то синхронизации внутри condition variable не нужно.

Второе, на что можно обратить внимание - это то, что обычно на condition variable ждут, взяв мьютекс, а condition variable его освобождает и затем захватывает снова. Однако, в условиях кооперативной многозадачности мьютекс нам вообще не нужен. Кроме того, на самом деле системный вызов futex, с помощью которого мьютекс реализуется, как раз и представляет из себя некое подобие condition variable. Также, в отличии от std::condition_variable, никаких spurious wake ups у нас быть не может.

Неблокирующий ввод-вывод

Теперь нам нужно написать реализацию ожидания готовности файловых дескрипторов к вводу-выводу. Будет некоторый отдельный поток, который будет выполнять системный вызов poll для получения информации о том, что файловый дескриптор готов.

Поток, желающий подождать готовности файлового дескриптора, будет создавать condition variable, сохранять свой запрос на ожидание ввода-вывода и засыпать на этом condition variable.

реализация
struct FdRequest {
    CondVar cv;
    int fd;
    short events;
};

class Waiter {
public:
    Waiter();

    static int wait(int fd, short events);

    static void stop();

    static void loop();

private:

    CondVar cv;
    unordered_map<int, FdRequest*> map;
    bool stopped = false;
};
Waiter waiter;

int Waiter::wait(int fd, short events) {
    FdRequest fdRequest{.cv = CondVar(), .fd = fd, .events = events};
    waiter.map[fd] = &fdRequest;
    waiter.cv.notify_one();
    fdRequest.cv.wait();
    return fdRequest.events;
}

Waiter::Waiter() {
    std::cout << "waiter initialising" << std::endl;
}

[[maybe_unused]] Fiber fiber(Waiter::loop);

void Waiter::stop() {
    waiter.stopped = true;
}

void Waiter::loop() {
    while (!waiter.stopped) {
        while (waiter.map.empty()) {
            waiter.cv.wait();
        }
        std::vector<pollfd> request;
        request.reserve(waiter.map.size());
        for (auto& elem : waiter.map) {
            request.push_back(pollfd{.fd = elem.first, 
              .events = elem.second->events, 
              .revents = 0});
        }
        int ret = poll(&request[0], request.size(), 100);
        if (ret < 0) {
            printf("poll returned with error %s", strerror(errno));
            continue;
        }
        for (auto& elem : request) {
            if (elem.revents > 0) {
                waiter.map[elem.fd]->cv.notify_one();
                waiter.map.erase(elem.fd);
            }
        }
        sched_execution();
    }
}

Пишем сервер

Теперь можно написать сервер. Он будет работать так: при запуске будет запускаться поток, который будет создавать сокет, связывать его с портом и дальше начинать принимать на нём соединения от клиентов. При этом перед вызовом accept для приёма соединений следует использовать Waiter для ожидания наличия входящих соединений.

При появлении нового соединения, будет запускаться новый поток, который будет ожидать ввода от клиента, а потом писать обратно в сокет ровно то, что было прочитано.

реализация
void worker(int fd) {
    printf("work called with fd: %d\n", fd);
    char buf[1024];
    while (true) {
        Waiter::wait(fd, POLLIN);
        ssize_t n = read(fd, buf, sizeof(buf));
        if (n == 0) {
            printf("client finished, leaving\n");
            return;
        }
        if (n < 0) {
            printf("in worker error: %s\n", strerror(errno));
            return ;
        }
        int wrote = 0;
        while (wrote < n) {
            Waiter::wait(fd, POLLOUT);
            ssize_t m = write(fd, buf + wrote, n - wrote);
            if (m < 0) {
                printf("in worker error: %s\n", strerror(errno));
                return ;
            }
            wrote += m;
        }
    }
}

int main() {
    Fiber main_fiber([] () {
        std::cout << "main enetered" << endl;
        Fiber global_fiber([]() {
            int socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
            if (socket_fd < 0) {
                printf("socket error: %s\n", strerror(errno));
                exit(0);
            }
            int ret = fcntl(socket_fd, F_SETFL, O_NONBLOCK);
            if (ret == -1) {
                printf("fcntl error: %s\n", strerror(errno));
                exit(0);
            }
            sockaddr_in sin{};
            sin.sin_family = AF_INET;
            sin.sin_port = htons(8001);
            sin.sin_addr = in_addr{0};
            if (bind(socket_fd, reinterpret_cast<const sockaddr *>(&sin), sizeof(sin)) < 0) {
                printf("bind error: %s\n", strerror(errno));
                exit(0);
            }
            if (listen(socket_fd, 10) < 0) {
                printf("listen error: %s\n", strerror(errno));
                exit(0);
            }
            while (true) {
                printf("accepting\n");
                Waiter::wait(socket_fd, POLLIN);
                int client_fd = accept4(socket_fd, nullptr, nullptr, SOCK_NONBLOCK);
                Fiber thread(worker, client_fd);
            }
        });
    });
    startFiberManager();
    main_fiber.join();
}

Запускаем - работает! Поток ОС один, а обслуживаем мы многих клиентов параллельно и код пишем для многих потоков.

Многопоточность

Вообще-то, подход, при котором все user-space потоки выполняются в одном потоке ОС, во-первых, не масштабируется, а во-вторых, приводит к задержкам для всех клиентов в случае, если обслуживание одного из клиентов требует значительного процессорного времени.

Я пытался преодолеть это ограничение, запуская менеджер потоков в нескольких потоках ОС и соответственно подготовив все классы.

С одной стороны, ничего принципиально нового для этого написать не пришлось, а с другой стороны, полностью отладить мне это так и не удалось, поэтому подробнее об этом не будем.

Избавляемся от boost::context

Теперь пришло время научиться всё делать своими руками и избавиться от boost::context. Для этого нам надо создать структуру, в которой мы будем хранить контекст, написать код для его создания и для переключения.

код
class Context {
public:
    static const ssize_t STACK_SIZE = 4096 * 2;

    Context() = default;
    Context(const Context&) = delete;
    Context& operator = (const Context&) = delete;
    Context(Context&& other);
    Context& operator = (Context&& other) noexcept;
    static Context create_context();
    ~Context();

    void setRip(unsigned long rip);

private:
    unsigned long rbx;
    unsigned long rsp;
    unsigned long rbp;
    unsigned long r12;
    unsigned long r13;
    unsigned long r14;
    unsigned long r15;
    unsigned long rip;
};

extern "C" {
/*
 * saves current context into old_context_dest and loads new_context
 */
extern void switch_context(Context* old_context_dest, Context* new_context, unsigned long first_arg = 0);
}

Функция switch_context получает по указателю два контекста. Текущий контекст сохраняется по первому указателю, после чего начинает выполняться контекст из второго аргумента, но перед этим третий аргумент (first_arg) записывается в регистр %rdi - это нужно для того, чтобы, когда в контексте первый раз запускается функция, ей можно было бы передать первый аргумент.

Реализация функции будет на ассемблере.

switch_context:
// saving current context
    mov     %rbx, (%rdi)
    mov     %rsp, 8(%rdi)
    mov     %rbp, 16(%rdi)
    mov     %r12, 24(%rdi)
    mov     %r13, 32(%rdi)
    mov     %r14, 40(%rdi)
    mov     %r15, 48(%rdi)
// адрес, с которого эта функция была вызвана, записывается в поле
// rip, так чтобы при переключении на этот контекст, начал выполняться 
// код после выполнения switch_context
    mov     (%rsp), %rax
    mov     %rax, 56(%rdi)

    mov     %rsi, %rdi
// restoring other context
    mov     (%rdi), %rbx
    mov     8(%rdi), %rsp
    mov     16(%rdi), %rbp
    mov     24(%rdi), %r12
    mov     32(%rdi), %r13
    mov     40(%rdi), %r14
    mov     48(%rdi), %r15
    mov     56(%rdi), %rax
    mov     %rax, (%rsp)
    mov     %rdx, %rdi
// значение поля rip записывается на вершину стека, чтобы ret вернула 
// нас ровно туда
    ret

Создание нового контекста тривиально - нужно выделить память для стека и записать в rsp.

Fiber local переменные

Изначально код написанного сервера мог работать только под Linux. Когда мы избавились от boost, мы ограничили себя архитектурой x86/64. Сейчас мы ограничим себя ещё больше - код будет требовать процессора, поддерживающего инструкции fsgsbase и достаточно новой версии Linux.

Я пока не разобрался досконально с работой thread local переменных, поэтому сразу прошу прощения.

Как работают thread local переменные? Я об этом уже писал немного подробнее здесь и там же приводится код для добавления поддержки thread local переменных в учебную ОС. Сейчас просто скажу, что по умолчанию компилятор обращается к thread local переменным по некоторому сдвигу от значения сегментного регистра %fs. Соответственно, для того, чтобы они работали, код, запускающий поток, должен выделять некоторую память для thread local storage, инициализировать её и записывать адрес этой памяти в %fs.

Но нам в первом приближении ничего не мешает сделать тоже самое для того, что бы все переменные объявлянные как thread_local, работали как fiber_local.

Вообще-то, раньше пользовательские программы не могли писать в %fs напрямую - это могло делать только ядро и для этого пользовательская программа должна была выполнять системный вызов. Затем в процессорах появились инструкции rdfsbase, wrfsbase, rdgsbase и wrfsbase, но для того, чтобы они работали, ядро ОС должно явно их разрешить, поэтому для их использования нужно довольно современное ядро.

Итак, нам нужно:

  • добавить в класс Context значение %fs

  • сохранять и восстанавливать %fs при переключении контекста в функции switch_context

  • читать ELF-файл исполняемого файла, чтобы узнать размер TLS и чем его инициализировать.

  • При создании контекста, выделять память для TLS, инициализировать и записывать в %fs адрес этой памяти

Надо, однако, отметить, что thread_local переменные использует не только наш код, но и libstdc, поэтому нужно детально разбираться, как сделать поддержку fiber local переменных так, чтобы в библиотеках, с которыми наш бинарник линкуется, thread_local переменные тоже бы работали. Или был бы неплохо, если бы наши fiber local переменные объявлялись бы не как thread_local, а как fiber_local, и компилятор использовал бы для них не %fs, а %gs - потому что переменные из libstdc могут вполне быть thread local, а не fiber local

Тем не менее, если аккуратно сделать шаги, описанные выше, то fiber local переменные заработают. Ну не чудо ли?

Комментарии (16)