Эта статья основана на истории об одном личном челлендже. Нужно было помочь с реализацией взаимодействия компонентов в программном комплексе заказчика. Иными словами, понадобилось IPC для дальнейшего развития продукта. Вызов заключался в том, что в обязательных требованиях числилось использование чистого С. Другие языки, включая С++ и Go, не рассматривались. 

В итоге я получил интересный опыт системного и параллельного программирования. Меня зовут Алексей Буреев, я работаю старшим инженером по разработке ПО в YADRO, сейчас мой основной рабочий язык программирования — Golang. В этой статье я проведу небольшой эксперимент: покажу, как можно решить одну задачу с помощью разных языков программирования. Языку С я противопоставлю Go, в основе которого есть исторические наработки проектирования С. Для этого немного заглянем «под капот» стандартных типов данных, которые уже были заботливо созданы для нас разработчиками языка.

Если вы гофер, обратите внимание на митап по Go, который мы проводим в Нижнем Новгороде и онлайн. В числе тем обсудим, как использовать горутины грамотно и безопасно. Программа и регистрация — по ссылке.

В общей формулировке задача состояла в том, чтобы создать IPC, в основе которого клиент-серверная архитектура. Подразумевалось, что нагрузка на него может быть достаточно высокой: канал для общения между сервисами и компонентами мог использоваться для передачи большого объема данных, частота пересылаемых сообщений также ожидалась высокая. 

В статье я рассмотрю часть действий, которые представляют наибольший интерес для сравнения двух языков. К ним относится реализация пула с потоками, которые будут «пережевывать» запросы и данные, и логика event-driven. Это и рассмотрим далее.

Диаграмма пула потоков
Диаграмма пула потоков

Решение задачи в теории и сопутствующие вопросы

Отбросив все нюансы, в сухом остатке можно использовать один управляющий поток, который порождает и оркестрирует остальные потоки в пуле. Они, в свою очередь, каким-то образом получают задачи для обработки, выполняют их и ожидают следующие задач в фоне. Чтобы собрать работающий пул потоков, используем парадигму event-driven, чтобы данные передавались асинхронно, а ресурсы потреблялись грамотно: без простоев и излишков.

Создавать потоки в Linux даже на С, используя glibc, несложно — в составе библиотеки уже есть libpthread, предоставляющая нужный API. Однако для создания работоспособного прототипа, кроме простого порождения потоков, нужно решить еще ряд вопросов:

  • как выбирать свободный поток в пуле, готовый обработать запрос,

  • как пробудить поток и сообщить ему о новом запросе на обработку,

  • что если на очередном запросе не будет свободных потоков в пуле.

Далее в тексте постараемся их решить на двух языках программирования.

Реализуем парадигму event-driven

Как это работает на С

При использовании pthread из набора стандартной библиотеки С glibc в Linux доступен богатый выбор примитивов синхронизации: мьютексы, условные переменные, спинлоки и барьеры, которые обычно редко где используются в коде на С. Хотя похожий аналог очень популярен в Golang — речь идет о примитиве WaitGroup.

Кроме этого, есть дополнительный список API-вызовов для различных нужд. Например, есть Linux-специфичный системный вызов eventfd, обернутый в библиотечный вызов glibc. Он позволяет создавать приложения в парадигме event-driven. Eventfd — удобный инструмент, особенно для таких целей, как «ручное» управлением пулом потоков. Однако одной из неудобных особенностей этого системного вызова является аллокация отдельного файлового дескриптора для последующей обработки операций с событиями. Дескрипторы могут закончиться, если имеются граничные условия, в которых исчерпание случится из-за логики кода или используемой конфигурации.

Рассмотрим блок кода, который проиллюстрирует базовый принцип работы с данным примитивом:

struct args {
    int fd;
};

void *background_job(void *arg) {
    struct args *args = (struct args*)arg;
    // полезная обработка задачи, в нашем примере можно
    // обойтись sleep
    (void)sleep(3);

    // сообщаем о завершении и выходим из функции
    (void)printf("Exiting from thread.\n");
    (void)eventfd_write(args->fd, 1);

    return NULL;
}

int main(int argc, char *argv[]) {
    // дескриптор нашего фонового потока и атрибуты для
    // его создания
    pthread_t job;
    pthread_attr_t attr;

    // подразумеваем, что системные вызовы всегда успешны,
    // опускаем проверки для простоты
    int event = eventfd(0, EFD_CLOEXEC);
    eventfd_t event_value = 0;
    struct args args = { .fd = event };
    
    // будем создавать поток, состояние которого изначально detached
    (void)pthread_attr_init(&attr);
    (void)pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    // создаем поток
    (void)pthread_create(&job, &attr, background_job, (void*)&args);

    (void)pthread_attr_destroy(&attr);

    // в нашем случае поток detached, утечки ресурсов не произойдет,
    // если мы вдруг забудем вызвать pthread_join, однако
    // у нас все еще остается возможность дождаться завершения потока
    //
    // данный подход позволяет ожидать сразу нескольких потоков,
    // можно создать счетный eventfd дескрипторов или использовать
    // несколько дескрипторов и мультиплексирование
    (void)eventfd_read(event, &event_value);
    printf("Thread has finished.\n");

    (void)close(event);

    return EXIT_SUCCESS;
}

Описанный пример — искусственный, но демонстрирует ключевые возможности примитива. С помощью eventfd можно без труда использовать архитектуру event-driven в рамках более низкоуровневого, чем Go, языка программирования C. 

Как результат, данный вызов существенно снижает затраты на делегирования потокам пула задач на выполнение. Управляющему потоку достаточно просто «просигналить» в eventfd, и ОС сама разбудит целевой поток, который ожидает события на этом примитиве. Никаких костылей с холостыми циклами ожидания событий для обработки в потоках из пула создавать не придется.

Как и любой файловый дескриптор, eventfd можно легко сочетать с другими в мультиплексных вызовах, то есть можно комбинировать обработку разных событий на разных инстансах примитива. Этой фичей мы также воспользуемся для простоты: будем использовать дополнительный инстанс примитива для уведомления о дополнительных событиях — например, об остановке пула.

Изобразить это можно, используя следующие примеры исходного кода:

struct pool_thread_info {
    int job_ev;  // уникальный идентификатор дескриптора входящих событий
    int sock_fd; // дескриптор подключенного клиента
                 // зарезервируем здесь еще несколько полей структуры,
                 // рассмотрим их позже
};

const unsigned int workers_count = 4;
const int magic = 42;

int pick_thread(struct pool_thread_info *info) {
    // в зависимости от занятости потока возвращаем true (1) или false (0)
    // код функции рассмотрим позже
    return 1;
}

void wake_thread(struct pool_thread_info *info, int client_sock) {
    info->sock_fd = client_sock;
    eventfd_write(info->job_ev, 1);
}

void *control_thread(void *arg) {
    // структуры данных для потоков в пуле
    struct pool_thread_info threads[workers_count];
    // дескриптор слушающего серверного сокета
    int srv_sock = -1;

    // подразумеваем, что управляющий поток уже проделал необходимую работу для старта пула:
    // 1. Дескрипторы событий созданы и разложены по структурам потоков
    // 2. Потоки пула созданы, каждому потоку передана структура с его дескрипторами и прочими данными
    // 3. Серверный сокет был успешно забинжен

    while (magic) {
        int client_sock = accept4(srv_sock, NULL, NULL, SOCK_CLOEXEC);
        unsigned int iter = 0;

        for (; iter < workers_count; iter++) {
            if (pick_thread(&threads[iter])) {
                wake_thread(&threads[iter], client_sock);
            }
        }
    }
    (void)pthread_exit(NULL);
}

void *pool_thread(void *arg) {
    struct pool_thread_info *info = (struct pool_thread_info *)arg;
    eventfd_t ev;

    while (magic) {
        (void)eventfd_read(info->job_ev, &ev);
            // обрабатываем данные с переданного дескриптора
    }
    (void)pthread_exit(NULL);
}

Интересную визуализацию работы горутин я нашел в этом тексте.

Источник.

Идеологически задача с пробуждением целевого потока решена: использование примитива eventfd значительно облегчает возможность использования подхода event-driven в рамках нашего пула потоков на чистом C.

Как это работает на Golang

В сравнении с С язык Golang пропитан идеей event-driven. Он предоставляет более универсальные и удобные средства синхронизации для такой архитектуры — каналы. В Golang они везде и для всего. И что, пожалуй, самое важное: с небольшими оговорками можно сказать, что они полностью работают в пространстве пользователя. В отличие от eventfd, который обрабатывается ядром, как и все прочие операции с дескрипторами ОС.

Аналогично с мультиплексированием в С для каналов Golang можно использовать механизм select, который так же обрабатывается в пространстве пользователя рантаймом языка. Для наглядности приведу простой пример. Важно понимать, что он искусственный, в промышленной разработке используется context.context:

type(
    poolGoroutineInfo struct {
        reqCh  chan string
        stopCh chan bool
    }

    reqHandlerInfo struct {
        stopCh chan bool
    }

    connHandlerInfo struct {
        listener *net.Listener
        data     chan string
    }
)

const (
    bufferLen    = 1024
    workersCount = 4
)

var (
    workers [workersCount]poolGoroutineInfo
)

func poolWorker(info poolGoroutineInfo) {
    for {
        select {
        case request :=<-info.reqCh:
            // производим обработку запроса, при необходимости
            // можно воссоздать идеологически схожий принцип и через канал передавать
            // открытые соединения
            fmt.Println(request)
        case <-info.stopCh:
                return;
        }
    }
}

func connHandler(info connHandlerInfo) {
    pListener := info.listener
    defer (*pListener).Close()
    
    buffer := make([]byte, bufferLen)

    for {
        conn, err := (*pListener).Accept()
        if err != nil {
            // для простоты рассмотрим лишь случай явной остановки по запросу от ctx.Done
            return
        }
        length, _ := conn.Read(buffer)
        info.data <- string(buffer[:length])        
    }
}

func requestHandler(info reqHandlerInfo) {
    ctx, cancel := context.WithCancel(context.Background())
    lc := net.ListenConfig{}

    // создаем слушающий серверный сокет
    listener, _ := lc.Listen(ctx, "tcp", ":9090")
    
    // общий канал для данных, используемый connHandler для записи и poolWorker для чтения
    recvData := make(chan string)

    // подразумеваем что управляющая горутина уже проделала необходимую работу для старта пула:
    // 1. Канал для данных создан, один для всех горутин
    // 2. Горутины пула созданы, каждой горутине передана структура с каналами прочими данными
    // 3. Серверный сокет был успешно забинжен

    go connHandler(
        connHandlerInfo{
            listener: &listener,
            data: recvData,
        },
    )

    for {
        select {
        case <- info.stopCh:
            cancel()
            // Кроме остановки горутины, слушающей серверный сокет, останавливаем и горутины пула
            return;
        }
    }
}

Подробнее о мультиплексировании

Выше мы касались вопроса мультиплексирования — остановимся на этой теме.

Как это работает на С

На чистом C мы повсеместно имеем дело с файловыми дескрипторами, которые под собой внутри VFS прячут все детали реализации.

Механизм event-driven очень удобно сочетать c мультиплексированием — например, для контролирования механизма работы управляющего потока. В процессе работы он принимает входящие соединения, используя системный вызов accept/accept4. Однако есть нюанс: используя блокирующий сокет, наш код «подвиснет» до прихода следующего клиента или до первой ошибки на сокете. С другой стороны, использование неблокирующего сокета заставит нас ждать клиента в бесконечном цикле, активно занимая процессор, или использовать мультиплексные вызовы.

Давайте подробнее рассмотрим оба подхода. Первый вариант будет использовать блокирующий сокет, второй — неблокирующий.

Подход с блокирующим сокетом

#define PORT (8080)

int server_handler()
{
    int sockfd;
    int connfd;
    int len;
    struct sockaddr_in servaddr;
    struct sockaddr_in cli;
   
    // создаем серверный сокет
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {  
        return errno;
    }
    memset(&servaddr, 0, sizeof(servaddr));
   
   // будем слушать на всех интерфейсах используемый порт 8080
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);
   
    // биндим серверный сокет
    if ((bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) != 0) {
        return errno;  
    }
   
    if ((listen(sockfd, 1)) != 0) {
        return errno;
    }

    len = sizeof(cli);
   
    // вызываем accept в блокирующем режиме
    connfd = accept(sockfd, (struct sockaddr *)&cli, &len);
    if (connfd < 0) {
        return errno;
    } else {
        printf("server accepted the client\n");
    }
   
    // производим необходимые действия для обработки запросов клиента
    func(connfd);

    // закрываем клиентский сокет
    close(connfd);

    // закрываем серверный сокет
    close(sockfd);
}

В описанном коде используется подход с блокирующим сокетом. Как уже было сказано ранее, на вызове accept данный код просто «подвиснет» в ожидании подключения нового клиента. Иными словами при возникновении другого события внутри нашего сервера повлиять на заблокированный вызов можно будет только отправлением сигнала в целевой поток. А это может прервать другие системные вызовы.

Подход с неблокирующим сокетом

#define PORT (8080)
  
int server_handler()
{
    int sockfd;
    int connfd;
    int len;
    struct sockaddr_in servaddr;
    struct sockaddr_in cli;
   
    // создаем серверный сокет
    sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (sockfd < 0) {  
        return errno;
    }
    memset(&servaddr, 0, sizeof(servaddr));
   
   // будем слушать на всех интерфейсах, используемый порт 8080
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);
   
    // биндим серверный сокет
    if ((bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) != 0) {
        return errno;  
    }
   
    if ((listen(sockfd, 1)) != 0) {
        return errno;
    }

    len = sizeof(cli);
   
    // вызываем accept в неблокирующем режиме
    do {
        connfd = accept4(sockfd, (struct sockaddr *)&cli, &len, SOCK_NONBLOCK);
    } while ((connfd < 0) && (errno == EAGAIN));

    if (connfd < 0) {
        printf("failed to accept new client, errno = %s\n", strerror(errno));
        return errno;
    } else {
        printf("server accepted the client\n");
    }
   
    // производим необходимые действия для обработки запросов клиента
    func(connfd);

    // закрываем клиентский сокет
    close(connfd);

    // закрываем серверный сокет
    close(sockfd);
}

Оба подхода без использования мультиплексирования наглядно показывают проблему реализации на С и наталкивают на необходимость использования дополнительных механизмов для решения всплывающих проблем. 

В этом блоке кода мы используем уже другой подход к ожиданию подключения клиента с применением цикла do/while для обработки ситуации, когда подключенный клиент отсутствует. Такая логика позволяет не блокироваться на системном вызове accept4 и потенциально проводить дополнительные проверки между вызовами accept4

Однако возникает другая проблема: при отсутствии подключенных клиентов accept4 всегда возвращает ошибку и устанавливает errno в значение EAGAIN. Иными словами без подключенного клиента мы получаем обычный busy loop, неограниченно потребляющий ресурсы процессора. Проблему можно решить добавлением дополнительных вызовов семейства sleep или sched_yield, но в таком случае прием входящего соединения и его обработка произойдут с задержкой. 

Оба подхода без использования мультиплексирования наглядно показывают проблему реализации на С и наталкивают на необходимость использования дополнительных механизмов для решения всплывающих проблем. 

Основными мультиплексными вызовами в Linux являются select, poll и epoll

  • select — довольно устаревший, у него есть ограничения в работе и им неудобно пользоваться. 

  • epoll — наиболее современный и самый мощный механизм из трех, отлично подходит для работы с большим числом дескрипторов. 

  • poll — достаточно удобный для небольшого набора дескрипторов.

В прототипе будем пользоваться последним вызовом.

Проблема полной блокировки в ожидании клиента решается использованием дополнительного дескриптора eventfd в мультиплексном вызове. С ним ожидание нового клиента можно легко и явно прервать извне. Эту цель также можно достичь, используя сигнал (вариации ppoll и прочие), но в качестве примера код с дополнительным дескриптором выглядит нагляднее.

Простыми словами при необходимости любой внешний поток сможет прервать ожидание контролирующего потока для выполнения каких-либо других задач.

Ниже — пример подобного кода:

#define PORT (8080)
#define FDS_COUNT (2)

enum {
    event_fd_idx = 0,
    socket_fd_idx
};

int server_handler(int eventfd)
{
    int sockfd;
    int connfd;
    int len;
    int ret;
    struct sockaddr_in servaddr;
    struct sockaddr_in cli;

    struct pollfd poll_fds[FDS_COUNT];
   
    // создаем серверный сокет
    sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (sockfd < 0) {  
        return errno;
    }
    memset(&servaddr, 0, sizeof(servaddr));
   
   // будем слушать на всех интерфейсах используемый порт 8080
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);
   
    // биндим серверный сокет
    if ((bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) != 0) {
        return errno;  
    }
   
    if ((listen(sockfd, 1)) != 0) {
        return errno;
    }

    len = sizeof(cli);
   
    memset(&poll_fds, 0, sizeof(poll_fds));
    poll_fds[event_fd_idx] = (struct pollfd){ .fd = eventfd, .events = POLLIN, .revents = 0 };
    poll_fds[socket_fd_idx] = (struct pollfd){ .fd = sockfd, .events = POLLIN, .revents = 0 };

    // ожидаем события на файловых дескрипторах
    ret = poll(poll_fds, FDS_COUNT, -1);

    // обрабатываем потенциальные ошибки
    if (ret <= 0) {
        printf("poll returned unexpected value\n");
        return -1;
    }
    if (poll_fds[event_fd_idx].revents & (POLLERR | POLLNVAL | POLLHUP)) {
        printf("unexpected error flags found on eventfd descriptor\n");
        return -1;
    }
    if (poll_fds[socket_fd_idx].revents & (POLLERR | POLLNVAL | POLLHUP)) {
        printf("unexpected error flags found on socket descriptor\n");
        return -1;
    }

    // проверяем наличие событий на дескрипторах

    if (poll_fds[event_fd_idx].revents & POLLIN) {
        // обрабатываем внешнее пришедшее событие
    }

    if (poll_fds[socket_fd_idx].revents & POLLIN) {
        // принимаем входящее соединение от клиента
        // вызываем accept в неблокирующем режиме    
        connfd = accept4(sockfd, (struct sockaddr *)&cli, &len, SOCK_NONBLOCK);

        if (connfd < 0) {
            printf("failed to accept new client, errno = %s\n", strerror(errno));
            return errno;
        } else {
            printf("server accepted the client\n");
        }
   
        // производим необходимые действия для обработки запросов клиента
        func(connfd);

        // закрываем клиентский сокет
        close(connfd);        
    }    

    // закрываем серверный сокет
    close(sockfd);
}

Как это работает на Golang: взгляд на netpoll

Внутреннее устройства Golang уже не раз обсуждали на Хабре. Здесь я хотел бы кратко затронуть тему использования мультиплексирования в недрах рантайма Golang, в частности сущности netpoll.

Касаясь сущности netpoll, стоит упомянуть планировщик в Golang и исполняемые сущности — горутины. 

Программисты на C привыкли оперировать понятием потока (он же thread/LWP) как минимальной исполняемой сущности в рамках языка. С Golang дело обстоит интереснее: появляется еще один уровень исполняемых сущностей — горутины. Как и у потоков, у них есть собственный контекст выполнения. На первый взгляд, кажется, что они мало чем отличаются от потоков, но при детальном рассмотрении видны кардинальные различия.

Работа горутин контролируется планировщиком Golang — частью рантайма языка. Сами горутины не имеют абстракции на уровне ОС, они — минимальная единица исполнения. С точки зрения планировщика ОС горутины выполняются в контексте потоков.

Планировщик Golang решает следующие вопросы:

  • поверх какого ОС-потока будет выполняться горутина,

  • когда нужно дать горутине квант процессорного времени, 

  • а когда остановить выполнение и положить в очередь на ожидание. 

Очень похоже на работу планировщика в ядре ОС, только все работает в пространстве пользователя.

Интересно, что при старте приложения рантайм языка по умолчанию уже создает пул потоков, на котором будут выполняться горутины (данное поведение можно контролировать и изменять перед запуском приложения и уже во время работы).

Создание горутины — более легковесная операция, которая не вовлекает в работу планировщик ОС. То есть, например, можно легко обойтись без создания пула горутин — просто при входящих запросах извне каждый раз создавать новую. Ведь для работы горутин планировщик уже создал пул своих потоков. На этой идее построено множество фреймворков в Golang.

Вернемся к нашему решению. С точки зрения общего подхода логика работы нашего управляющего потока очень похожа на netpoll. Это отдельный поток, который обрабатывает ожидания, в частности входящих соединений. Внутри него используется самый продвинутый на данный момент мультиплексный вызов epoll. Он оперирует набором файловых дескрипторов от горутин, в том числе тех, что ожидают подключения новых клиентов. При возникновении событий на файловых дескрипторах (сокетах) соответствующие им горутины пробуждаются планировщиком рантайма языка.

Аналогично с дополнительным eventfd-дескриптором в нашем потоке netpoll в своем наборе обрабатываемых дескрипторов содержит один специальный — для внутренней логики работы. Например, вновь созданная горутина вызывает метод accept у net.Listener, но при этом входящих соединений от клиентов в данный момент нет. В этой ситуации необходимо сообщить netpoll о необходимости добавить в его набор дескрипторов еще один для отслеживания событий на нем. Для прерывания epoll и переконфигурации netpoll как раз используется данный служебный файловый дескриптор.

Подробнее с базовой логикой работы и устройством netpoll в Linux можно ознакомиться в файле.

Мьютексы, или как их можно «ненормально» использовать

Мьютексты нам понадобятся, чтобы построить более легковесный пул потоков. 

Разработчики привыкли использовать мьютексы для блокировки доступа к критичным данным, чтобы избежать состояния гонки.

Как это работает на С

API, предоставляемый glibc для C, позволяет производить не только захват и освобождение блокировки, но и операцию проверки ее занятости. Это достаточно важная деталь, которую используют редко. По этой теме сломано достаточно много копий в бесконечных холиварах о том, что нужно переосмыслить весь подход к архитектуре при первой же попытке использования функций вида trylock.

Давайте разберем небольшой пример, связанный с использованием синхронизации на основе trylock, на языке C:

struct pool_args {
    pthread_mutex_t lock;
    int stop_ev;
    int shared_data;
};

const unsigned int workers_count = 4;
const useconds_t worker_sleep_usec = 500000;
const unsigned int control_sleep_sec = 3;
 
void *pool_worker(void *arg) {
    uint64_t ev;
    struct pool_args *args = (struct pool_args *)arg;
    // подразумеваем, что read может вернуть либо количество прочитанных байт,
    // либо ошибку и errno = EAGAIN как индикацию того, что событие не было инициировано
    while ((read(args->stop_ev, &ev, sizeof(ev)) < 0) && (errno == EAGAIN)) {
        // пример работы с trylock: при неуспешной попытке захвата мьютекса вместо
        // ожидания применяем стратегию поллинга, данный код активно потребляет процессорное
        // время и близок по семантике к spinlock'у
        if (pthread_mutex_trylock(&(args->lock)) != 0) {
            continue;
        }
        // имитация полезной нагрузки
        (void)usleep(worker_sleep_usec);
        args->shared_data++;
        (void)pthread_mutex_unlock(&(args->lock));
    }
    return NULL;
}

int control_thread(void *arg) {
    // подразумеваем, что все системные и библиотечные вызовы успешны
    struct pool_args args = {
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .stop_ev = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE),
        .shared_data = 0
    };
    unsigned int iter = 0;
    pthread_t ids[workers_count];

    for (; iter < workers_count; iter++) {
        pthread_create(&ids[iter], NULL, pool_worker, (void *)&args);
    }

    // имитация работающего процесса
    (void)sleep(control_sleep_sec);

    // останавливаем наши фоновые потоки
    (void)eventfd_write(args.stop_ev, workers_count);

    // ждем их завершения во избежание утечки памяти
    for (iter = 0; iter < workers_count; iter++) {
        (void)pthread_join(ids[iter], NULL);
    }

    // освобождаем ресурсы
    (void)close(args.stop_ev);

    return NULL;
}

В этом примере можно увидеть общую идеологию использования метода trylock: если захватить мьютекс нельзя, просто переходим на новую итерацию цикла. Для простоты мы здесь использовали только один ресурс для доступа, в более сложных случаях их может быть несколько. В качестве развития идеи давайте использовать этот метод для определения потока в пуле, не занятого работой в данный момент.

Опишем общую концепцию работы потока в пуле: при получении задания после пробуждения из сна поток захватывает «свой» мьютекс и держит его на протяжении всей обработки запроса. Отпускает лишь в самом конце — и так циклично, на протяжении всего времени работы.

Контролирующий поток, в свою очередь, при приходе нового запроса вызовом trylock может достаточно быстро определить свободный поток в пуле. Так как у каждого потока в пуле есть «свой» мьютекс, который поток держит на протяжении обработки запроса, то при попытке вызова trylock с захваченным мьютексом ничего не случится — мьютекс мы не получим. В случае успеха и захвата мьютекса мы гарантированно находим поток, ожидающий новой работы.

В этом подходе дополнительно решена проблема гонки с доступом к служебным данным между управляющим потоком и потоком из пула в ситуации делегирования очередного запроса.

Давайте напишем часть исходного кода на C, используя описанный подход:

struct pool_thread_info {
    int job_ev;           // уникальный идентификатор дескриптора входящих событий
    int sock_fd;          // дескриптор подключенного клиента
    pthread_mutex_t lock; // мьютекс для определения статуса потока и синхронизации данных
};

const unsigned int workers_count = 4;
const int magic = 42;

int pick_thread(struct pool_thread_info *info) {
    // в зависимости от занятости потока возвращаем true (1) или false (0)
    // код функции рассмотрим позже
    int ret = pthread_mutex_trylock(&(info->lock));
    if (ret == 0) {
        // нам удалось выбрать поток
        return 1;
    } else {
        // рассматриваем случай EBUSY
        return 0;
    }
}

void wake_thread(struct pool_thread_info *info, int client_sock) {
    info->sock_fd = client_sock;
    (void)eventfd_write(info->job_ev, 1);
    (void)pthread_mutex_unlock(&(info->lock));
}

void *control_thread(void *arg) {
    // структуры данных для потоков в пуле
    struct pool_thread_info threads[workers_count];
    // дескриптор слушающего серверного сокета
    int srv_sock = -1;

    // подразумеваем, что управляющий поток уже проделал необходимую работу для старта пула:
    // 1. Дескрипторы событий созданы и разложены по структурам потоков
    // 2. Потоки пула созданы, каждому потоку передана структура с его дескрипторами и прочими данными
    // 3. Серверный сокет был успешно забинжен

    while (magic) {
        int client_sock = accept4(srv_sock, NULL, NULL, SOCK_CLOEXEC);
        unsigned int iter = 0;

        for (; iter < workers_count; iter++) {
            if (pick_thread(&threads[iter])) {
                wake_thread(&threads[iter], client_sock);
            }
        }
    }
    (void)pthread_exit(NULL);
}

void *pool_thread(void *arg) {
    struct pool_thread_info *info = (struct pool_thread_info *)arg;
    eventfd_t ev;

    while (magic) {
        // после поступления сигнала на обработку блокируем мьютекс, это позволяет достигнуть сразу двух целей
        // 1. Исходя из логики, получаем эксклюзивный доступ к атрибутам потока, например, к дескриптору соединения клиента
        // 2. В данном состоянии, исходя из логики, «сообщаем» управляющему потоку о том, что мы заняты в данный момент и не готовы принять новый запрос 
        (void)eventfd_read(info->job_ev, &ev);
        (void)pthread_mutex_lock(&(info->lock));
            // обрабатываем данные с переданного дескриптора
        (void)pthread_mutex_unlock(&(info->lock));
    }
    (void)pthread_exit(NULL);
}

Как это работает на Golang

В Golang мьютексы доступны «из коробки» как часть пакета sync. Они работает аналогичным образом, парадигма неизменна — взаимоисключающая блокировка. 

Но есть и различия: однажды на практике мы столкнулись с одной деталью: по сравнению с C-подобными языками в Golang невозможно создать рекурсивный мьютекс. Эта особенность хорошо стреляет в ногу пользователям Win32 API: CRITICAL_SECTION по умолчанию сразу рекурсивный, попытки использовать подобный принцип в Golang сразу приведут к дедлоку.

Вот пример, который демонстрирует эту особенность:

func foo(v *sync.Mutex) {
    fmt.Println("on lock")
    v.Lock()
    defer v.Unlock()
    foo(v)
}

func main() {
    v := sync.Mutex{}
    foo(&v)
}

У типа Mutex есть метод TryLock, который был добавлен сравнительно недавно по меркам языка — в версии 1.18. При этом у метода своя непростая история, с которой можно ознакомиться здесь.

Проблема, решаемая в рамках С c помощью trylock и мьютекса, в Golang разрешается гораздо грациознее с помощью каналов. Они «потокобезопасны» по дефолту и могут использоваться сами по себе для синхронизации: несколько горутин могут производить параллельные операции чтения из канала без состояния гонки. А свободные горутины из пула будут, как по взмаху волшебной палочки, сами выстраиваться в очередь за работой.

Что делать, если мы не нашли ни одного потока в пуле для обработки задачи

При переходе к теории мы отклонились от решения одного из озвученных в начале статьи вопросов: что делать, если при попытке обработки входящего соединения не нашлось ни одного свободного потока в пуле. Иными словами, все потоки заняты, и запрос не может быть обработан в данный момент.

Как это работает на С

Для данной задачи может быть несколько потенциальных решений, давайте рассмотрим и разберем их.

Первый и самый простой вариант — использование бесконечного цикла с ожиданием освобождения одного из потоков и постоянной проверки их состояния. За очевидностью этого решение кроется главный минус подхода: при высоких нагрузках и постоянной занятости потоков в пуле основной поток будет сильно потреблять процессорное время.

Второй вариант — отложенная обработка запроса. Мы просто помещаем запрос, пришедший на исполнение, во внутреннюю очередь и на этом завершаем начальную часть работы. Далее при обработке новых входящих запросов будем проверять очередь и, если найдем в нем уже имеющийся запрос, обработаем сначала его. Метод качественно решает проблему, описанную в первом варианте, — теперь основной поток не потребляет процессорное время впустую. Однако сгенерировались две новые проблемы:

  • с этого момента у нас начинается потребление памяти для сохранения очереди запросов,

  • если разрыв по времени между двумя соседними запросами будет достаточно большой, то необработанный запрос будет ждать все это время.

Первую проблему достаточно просто разрешить, ограничив общий объем памяти, используемый для сохранения запросов в очереди, — можно просто использовать кольцевую очередь. Со второй проблемой сложнее, поскольку мы не можем предсказать частоту прихода запросов. Однако мы можем эмпирически предположить, сколько всего времени тратится на обработку запроса, и использовать его как основу для третьего варианта.

Третий вариант — добавление еще одного примитива синхронизации к предыдущему решению. Поскольку нет никаких гарантий, что следующий запрос придет достаточно быстро, давайте использовать таймер с квантом, равным по времени обработке запроса потоком в пуле. Для примера возьмем квант в 10 миллисекунд. Если запрос не был изначально обработан, кладем его в очередь и засыпаем на указанный период, предполагая, что один из фоновых потоков ядра освободится за это время. Затем вновь пытаемся найти свободный поток. Если не найдем — снова заводим таймер, засыпаем и далее по кругу. 

Если следующий запрос придет раньше, чем сработает таймер, попытаемся найти свободный поток. В случае неудачи снова перейдем к использованию таймера, предварительно перепланировав его на более поздний срок относительно текущей временной точки.

Почему с Golang не пришлось бы решать эту проблему

В Golang данная проблема часто решается иначе — даже без использования пула потоков. Основная идея заключается в том, что создание и планирование горутины планировщиком рантайма — это гораздо более легкая операция по сравнению с традиционной работой с потоками.

Как я уже писал выше, при обработке нового входящего запроса проще создать новую горутину и поручить обработку запроса ей. Вместо того чтобы писать алгоритм для поиска свободной горутины из пула. Этот подход используется во множестве фреймворков, написанных для Golang — например, HTTP-сервер Echo. Каждый раз при обработке входящего соединения он просто создает новую отдельную горутину и далее выполняет обработку http-запроса уже в ней.

Порой встречаются случаи, где использование пула горутин может быть мотивировано конкретной задачей, но в подавляющем большинстве случаев хорошо себя зарекомендовал именно упомянутый механизм. Из-за удобства разработки и хорошей масштабируемости решения с созданием новых горутин внутри рантайма языка.

Итоги нашего «сравнительного анализа»

Используя даже самые простые механизмы в виде базовых примитивов синхронизации, предоставляемых операционной системой и стандартной библиотекой языка С, можно проектировать и создавать достаточно гибкие в плане функциональности многопоточные приложения. Однако за это приходится платить дополнительную цену: иногда приходится смешивать «вручную» и контролировать разные между собой сущности — например, мультиплексирование и синхронизацию. Необходимо учитывать различные тонкости и нюансы используемых объектов, чтобы избежать потенциальных ошибок. В многопоточных приложениях эта тема особенно сильно может себя проявлять.

С другой стороны, использование более высокоуровневого языка Go, позволяет существенно облегчить разработку за счет скрытия многих слоев абстракции от программиста и переносом их внутрь стандартной библиотеки и рантайма языка. Но и здесь могут крыться потенциальные трудности: при имеющихся изобилии и простоте важно понимать внутреннее устройство объектов и алгоритмов стандартной библиотеки, особенно для написания высоконагруженного кода. В противном случае можно столкнуться с нетривиальными проблемами, требующими комплексного подхода в отладке и профилировании.

В итоге, с одной стороны, мы получаем простоту методов, скрывающую детальную реализацию и различные абстракции под собой. А с другой — дополнительные нюансы, которые легко упустить из виду, хотя с их учетом можно получить более надежное решение. В качестве примера можно привести использование каналов в Golang: удобство их использования компенсируется дополнительными сложностями и условиями использования для корректной работы приложения.

Однако, несмотря на потенциальные трудности при использовании Golang, сам язык сравнительно простой и имеет более низкий порог входа, чем тот же C. Отдельно стоит упомянуть, что этот компилируемый язык изначально построен на идее многопоточности. Это позволяет ему стабильно оставаться нишевым языком в разработке микросервисной архитектуры для нагруженных систем, где важна скорость выполнения и не всегда подходят другие языки — например, тот же Python.

Общая мысль, которой можно завершить текст, в том, что одну и ту же задачу можно решить разными языками программирования. Иногда от выбора языка будет зависеть скорость разработки решения (эту тему мы развивать не будем, потому что статья больше про функциональные возможности синтаксиса языков). Но, как и везде, важно понимать нюансы и знать о подводных камнях. Например, опытный программист на С решит ту же задачу с распределением потоков лучше, чем начинающий Go-энтузиаст. Несмотря на то, что Go как язык можно считать максимально подходящим для решения. Очевидно, что выбор языка сильно зависит от поставленной задачи и уровня знаний специалистов, работающих в компании.

Комментарии (16)


  1. splinehip
    14.03.2024 10:29
    +2

    У вас один и тот же код для блокирующего и не блокирующего сокета


    1. enelv Автор
      14.03.2024 10:29
      +1

      UPD: спасибо! был один и тот же код, поправили. Сейчас актуальный вариант.
      Для блокирующего варианта используется простой вызов accept(), для неблокирующего варианта уже accept4 с флагом SOCK_NONBLOCK дополнительно обернутый в do/while для разрешения ситуации с EAGAIN.


      1. AndreyAf
        14.03.2024 10:29

        для не блокирующего я еще ставлю sleep - электричество экономлю


        1. enelv Автор
          14.03.2024 10:29

          Обычный простой sleep или вариации smart sleep из области мультиплексирования (select/poll/epoll)?


    1. yadro_team
      14.03.2024 10:29
      +2

      Спасибо за внимательность! Поправили


  1. Barabashkad
    14.03.2024 10:29
    +1

    пробежал бегло по тексту глаз зацепился за 2 утверждения

    ""При использовании pthread из набора стандартной библиотеки С glibc в Linux доступен богатый выбор примитивов синхронизации: мьютексы, условные переменные, спинлоки и барьеры, которые обычно редко где используются в коде на С ""

    что значит редко ... по надобности использутся, если аппликация multithreaded то без них нельзя
    и да, как говориться в С так носят, конечно сегодня выбор С для написания чего либо не всегда очевиден, но когда кроме него ничего не было , по другому и нельзя.
    Все эти примитивы широко используются везде и всегда


    "Используя даже самые простые механизмы в виде базовых примитивов синхронизации, предоставляемых операционной системой и стандартной библиотекой языка С, можно проектировать и создавать достаточно гибкие в плане функциональности многопоточные приложения. "

    все языки в итоге компилирутся/интерпретируються и работают на конкретной опреционой системе
    и ничем кроме того что операционная система предостволяет пользоваться не могут ..
    примитивы С максимально приближены к ОС
    все остальное реализовано через них и поверх них
    так что Go тоже пользуеться ими же


    1. enelv Автор
      14.03.2024 10:29
      +1

      Спасибо за размышления!
      Из личного опыта мысль про редкость использования относится в основном к pthread барьерам, остальные примитивы используются почти повсеместно. Вероятно оттого что данный примитив не так сильно распространен и используется для ограниченного круга задач. Встречались различные вариации других примитивов явно напоминающие использование барьера, например, счетный вариант (EFD_SEMAPHORE) eventfd или сами семафоры в чистом виде.

      Не полностью понял часть комментария про примитивы С и основу ОС.
      Про привязку примитивов синхронизации к ОС вопрос достаточно обширный только потому что у Golang необходимо их рассматривать как часть рантайма: там в целом очень большое число оптимизаций для того чтобы избегать накладных расходов в вызовы ОС и обходиться поддержкой уровня горутин. Сам golang в чистом виде не использует вызовы С для имплементации примитивов синхронизации (системные вызовы, например, futex тут не рассматриваю).
      Но основа так или иначе это одна - ОС которая предоставляет основные возможности для базовых операций.


      1. Barabashkad
        14.03.2024 10:29

        я имел ввиду что примитивы С и примитивы ОС это одно и тоже
        С не добаляет не убовляет ничего


  1. diananesterova186
    14.03.2024 10:29
    +1

    Итак, мои выводы: хотя на языке C можно достичь высокой производительности и эффективности при работе с потоками, разработка и отладка подобного кода требует значительных усилий. С другой стороны, Go предоставляет более высокоуровневые средства для работы с параллелизмом, что делает разработку проще и удобнее. В конечном итоге выбор между этими языками зависит от конкретных требований проекта и предпочтений разработчика.


  1. AndreyAf
    14.03.2024 10:29

    Что насчет сравнения потребления памяти в реализации GO по сравнению с Си от 1к потоков например?


    1. anayks
      14.03.2024 10:29

      Чисто с теоретической точки зрения, у Go никаких проблем не будет, потому что там стек горутины (собственно, гошного легковесного потока) 2-8 кбайт в зависимости от версии. И работать это будет поверх реальных потоков, которых будет выделено не больше, чем их доступно в ОС.

      На Си, если я правильно понимаю, два варианта:

      1. Поднимать 1000 потоков, мне кажется, будет крайне жирно, вес будет зависеть от реализации в ОС (1-4 мбайт вроде в зависимости от битности)

      2. Писать самопальные горутины с собственным стеком как в Go поверх реальных потоков, и выйдет то же самое


  1. Kelbon
    14.03.2024 10:29
    +2

    Как я уже писал выше, при обработке нового входящего запроса проще создать новую горутину и поручить обработку запроса ей

    Насколько мне известно, горутины это стекфул корутины, а от сегментированного стека Go в какой-то момент отказался в связи с оверхедом и прочими сложностями, а создание стекфул корутины вполне сравнимо по затратам с созданием целого ОС треда. Нагрузка на шедулер (просто другой) возрастёт, памяти потребляется много, так что я сомневаюсь, что это хорошее решение

    Опишем общую концепцию работы потока в пуле: при получении задания после пробуждения из сна поток захватывает «свой» мьютекс и держит его на протяжении всей обработки запроса. Отпускает лишь в самом конце — и так циклично, на протяжении всего времени работы.

    Зачем? Просто зачем он держит мьютекс не только во время получения задачи, а постоянно всё время работы? Это похоже на неправильное использование мьютекса


    Также ожидал и не увидел самое наивное устройство тредпула - одна очередь с кондваром и ожидающими работы потоками. В этой схеме не нужно искать свободного - он сам найдёт работу, в общем отпадают все поставленные вопросы:

    • как выбирать свободный поток в пуле, готовый обработать запрос,

    • как пробудить поток и сообщить ему о новом запросе на обработку,

    • что если на очередном запросе не будет свободных потоков в пуле.


    Конечно это далеко неидеальный тредпул, но я не вижу недостатков по сравнению с предложенным в статье. Плюс можно рассмотреть дальнейшие варианты с несколькими очередями и разными стратегиями выбора потока и тд (https://github.com/kelbon/kelcoro/blob/main/include/thread_pool.hpp). Тут уже добавляет своих проблем странное ограничение на запрет С++, при том что всё апи линукса почему-то разрешено


    1. falconandy
      14.03.2024 10:29

      Насколько мне известно, горутины это стекфул корутины, а от сегментированного стека Go в какой-то момент отказался в связи с оверхедом и прочими сложностями, а создание стекфул корутины вполне сравнимо по затратам с созданием целого ОС треда. Нагрузка на шедулер (просто другой) возрастёт, памяти потребляется много, так что я сомневаюсь, что это хорошее решение

      Вот замеры для Linux - Goroutines Are Not Significantly Smaller Than Threads и комментарии к ним


      1. Kelbon
        14.03.2024 10:29

        потребление памяти у стекфул корутин и тредов одинаковое, можно изменить размер стека и у корутин просто стек по дефолту поменьше. Насколько я вижу реализации использующие стекфул корутины по той или иной причине скатываются к просто созданию ещё одной целой корутины на запрос, что по итогу эквивалентно одному треду на запрос от которого они хотели спасти свой код (только всё стало гораздо сложнее + свой шедулер)

        Конкретно я говорю про Go и userver, я не отрицаю что можно сделать хорошо, только вот люди не хотят думать и писать хороший код


  1. nikolz
    14.03.2024 10:29
    +1

    "Создавать потоки в Linux даже на С, используя glibc, несложно.."

    --------------------

    Создавать потоки в Windows на С еще проще. Для этого нужна всего одна функция QueueUserWorkItem , которая помещает рабочий элемент(функцию пользователя) в очередь рабочего потока в пуле потоков, который создает OС. ttps://learn.microsoft.com/en-us/windows/win32/procthread/thread-pooling

    https://learn.microsoft.com/ru-ru/windows/win32/procthread/thread-pools


  1. rukhi7
    14.03.2024 10:29

    А вы не рассматривали вопрос насколько эффективно иметь в пул-потоков количество потоков большее (или меньшее) чем количество физических процессоров (аппаратных ядер) в системе?

    Как выбирается/чем определяется количество доступных потоков в пуле?