В предыдущей статье я рассмотрел различные способы организации стековых корутин в языке Си. Сегодня рассмотрим вопросы, связанные с написанием стековых сопрограмм на С++, и создадим на их основе tcp сервер, обрабатывающий запросы от клиентов на основе опроса событий с использованием API мультиплексированного ввода - вывода epoll. Данная тема, на мой взгляд, является ключевой для понимания того, как функционируют современные серверные приложения, написанные при помощи таких библиотек как Boost Asio.

Стековая сопрограмма на С++

Я рассматривал 3 способа организации корутин на языке Си:

  1. использование stejmp/longjmp;

  2. контекстные функции getcontext, makecontext и swapcontext;

  3. создание ассемблерной функции переключения контекста.

Для написания сопрограмм на C++ первый способ не подходит, так как longjmp выполняет переход в точку, созданную setjmp, без вызова деструкторов для локальных объектов. В данной статье рассмотрим использование контекстных функций. Третий способ оставим в качестве темы следующих публикаций.

Начнем с конструктора. Он будет принимать функцию, реализующую логику работы сопрограммы, её идентификатор и размер выделяемого стека. Для простоты будем считать, что в конструктор передается лямбда функция без параметров, которые, при необходимости, можно разместить в списке захвата. Конструктор имеет следующий вид.

Task(std::function<void (Task &)> func_arg, uint32_t cor_id, size_t stack_size = DefaultStackSize)
    :stack{new unsigned char[stack_size]}, 
     func{func_arg}, 
     id{cor_id}
{
  getcontext(&callee);
  callee.uc_link = &caller;
  callee.uc_stack.ss_size = stack_size;
  callee.uc_stack.ss_sp = stack.get();
  makecontext(&callee, reinterpret_cast<void (*)()>(&TaskCall), 1, this);
}

Вначале выделяется память под стек с использованием unique_ptr. Далее инициализируется текущий контекст callee вызовом getcontext. Для него задается область памяти под стек присвоением callee.uc_stack.ss_sp указателя, выделенного для unique_ptr. Кроме того, задается размер стека ss_size равным stack_size, а также адрес того контекста (caller), на который произойдет переключение после завершения выполнения callee. Далее с callee связывается функция TaskCall, которая будет выполнена при переключении контекста с caller на callee. Ей в качестве параметра передается указатель this текущего объекта Task.

static void TaskCall(Task* pcoro)
{
    pcoro->status = task_status::Task_RUNNING;
    pcoro->func(*pcoro);
    pcoro->status = task_status::Task_FINISHED;
}

В теле функции TaskCall вызывается основная функция корутины. В качестве параметра ей передается объект сопрограммы. Предварительно статус сопрограммы меняется на Task_RUNNING. Это необходимо для ее корректной обработки планировщиком. После завершения основной функции статус становится Task_FINISHED, что необходимо для удаления сопрограммы из списка планирования.

При вызове сопрограммы происходит переключение контекста, приводящее к началу или продолжению выполнения TaskCall.

void operator()()
{
  if(status == task_status::Task_FINISHED) 
    return;
  swapcontext(&caller, &callee);
}

Метод Yield переводит корутину в состояние Task_WAITING и передает управление от сопрограммы планировщику.

void Yield()
{
  status = task_status::Task_WAITING;
  swapcontext(&callee, &caller);
  RethrowException();
}

Так как сопрограмма ориентирована на обработку внешних событий, которые могут завершиться с ошибкой, то планировщик оповещает корутину об их возникновении с помощью исключений, вызывая SetException c параметром типа exception_ptr. Когда планировщик переключится снова в контекст корутины ее выполнение продолжится с вызова RethrowException, выполняющегося после swapcontext. RethrowException осуществляет выброс исключения в случае, если оно возникло.

void RethrowException()
{
  if(!exception)
    return;

  std::exception_ptr e = GetException();
  SetException(nullptr);
  std::rethrow_exception(e);
}

Для продолжения работы сопрограммы планировщик вызывает метод AllowResume, который меняет её статус на Task_RUNNING, что позволяет возобновить выполнение.

Каковы ограничения класса Task?

Во-первых, каждая сопрограмма имеет свой собственный стек, который не может использоваться совместно несколькими объектами этого класса. Следовательно, Task не является копируемым.

Во-вторых, в конструкторе Task вызывается makecontext с параметром this. Этот параметр используется для модификации контекста и, скорее всего, запоминается где-то в его структуре данных. Если попытаться выполнить перемещение для объекта класса Task и вызвать метод TaskCall на новом объекте, то произойдет аварийное завершение программы из-за того что используется this того объекта из которого выполнили перемещение. Для того чтобы проблема не возникала, Task сделан не перемещаемым.

Планировщик

Для эффективного доступа к сопрограммам планировщик использует неупорядоченный словарь task_map типа std::unordered_map<uint32_t, Task>. В качестве ключа используется уникальный номер сопрограммы, который присваивается ей при создании в методе CreateTask. В качестве значения выступает объект корутины (Task).

template<typename Func>
void Scheduler::CreateTask(Func&& func) 
{
    task_map.emplace(std::piecewise_construct,
        			 std::forward_as_tuple(id),
        			 std::forward_as_tuple(std::forward<Func>(func), id));
	id++;
}

Так как Task не копируется и не перемещается, то он создается внутри контейнера с использованием метода emplace.

Основным методом планировщика является RunTasks. В нем запускаются/возобновляют выполнение сопрограммы (метод ProcessTasks), а также происходит обработка событий (ProcessEvents).

void Scheduler::RunTasks(void)
{
	while(task_map.size() > 0)
	{
		try
		{ 
			ProcessTasks();
			ProcessEvents();
		}
		catch (const std::exception& e)
        {
            std::cerr << "Exception in Scheduler::RunTasks: " << e.what() << std::endl;
		}
		catch (...)
        {
            std::cerr << "Unknown exception in Scheduler::RunTasks !!!" << std::endl;
        }
	}
	std::cout << "Finished RunTasks!" << std::endl;
}

Так как в результате работы этих методов могут возникнуть исключения, то в RunTasks происходит их перехват. Для того чтобы не усложнять код информация об исключении просто выводится на экран. В реальном проекте исключения должны как минимум логироваться.

void Scheduler::ProcessTasks()
{
	for(auto curr_Task = task_map.begin(); curr_Task != task_map.end(); )
	{
		if((curr_Task->second.GetStatus() != task_status::Task_RUNNING) && (curr_Task->second.GetStatus() != task_status::Task_CREATED))
		{
			curr_Task++;
			continue;
		}

		curr_Task->second();
		if(curr_Task->second.GetStatus() == Task_FINISHED)
			curr_Task = task_map.erase(curr_Task);
		else
			curr_Task++;
	}
}

В методе ProcessTasks обходим task_map, хранящий сопрограммы, и запускаем или продолжаем выполнять те, которые находятся в состоянии "создано" или "выполняется". Если сопрограмма завершила свою работу она удаляется из task_map.

void Scheduler::ProcessEvents()
{
	auto events = poller->Poll();
    for(const auto& evt:events)
	{
		auto iter = task_map.find(evt.coro_id);
		if(iter == task_map.end())
			throw std::runtime_error("coro is absent in scheduler map");
	
		if(evt.event == DescriptorEvents::Error)
		{
			std::runtime_error specific_error("poller exception");
        	std::exception_ptr exptr = std::make_exception_ptr(specific_error);
			iter->second.SetException(exptr);
		}
		iter->second.AllowResume();
	}
}

В ProcessEvents получаем вектор(возможно пустой) произошедших событий вызовом метода Poll. Как работает этот метод рассмотрим далее. Событие описывается следующей структурой.

struct PollResult
{
	unit32_t coro_id;
	DescriptorEvents event;
    PollResult(uint32_t id, DescriptorEvents op) : coro_id(id), event(op) {}
};

Здесь coro_id - уникальный идентификатор сопрограммы. Event может принимать следующие значения: Accept (для серверного сокета), Read и Write (для клиентского), Error (для сигнализации об ошибке).

После получения событий в методе ProcessEvents ищем сопрограмму по значению coro_id. Далее анализируем значение event. Если произошла ошибка, то создаем объект исключения и передаем его сопрограмме вызовом SetException. Это исключение будет выброшено при возобновлении сопрограммы. Если ошибок не возникло, то вызовом AllowResume говорим планировщику, что корутину можно возобновить.

Теперь давайте посмотрим как можно использовать классы Task и Scheduler на примере реальной задачи создания однопоточного tcp сервера. Так как материал статьи, в первую очередь, ориентирован на начинающих, то далее приведу необходимы теоретический минимум, необходимый для понимания темы.

Основы клиент серверного взаимодействия

Для начала давайте вспомним, как осуществляется клиент серверное взаимодействие на основе сокетов. Согласно Википедии, сокет — название программного интерфейса для обеспечения обмена данными между процессами. Процессы при таком обмене могут исполняться как на одной, так и на различных ЭВМ, связанных между собой сетью. Сокет — абстрактный объект, представляющий конечную точку соединения. Следует различать клиентские и серверные сокеты. В процессе обмена используется два сокета: отправителя и получателя. Каждый процесс может создать «слушающий» (серверный) сокет и привязать его к порту операционной системы. Слушающий процесс обычно находится в цикле ожидания, просыпаясь при появлении нового соединения. Клиент устанавливает соединение со слушателем (сервером), после чего любое чтение или запись осуществляется через файловый дескриптор, связанный с сокетом.

Сокет создается при помощи системного вызова socket(). Вся дальнейшая работа с ним выполняется с помощью дескриптора, возвращенного данным вызовом: fd = socket(domain, type, protocol).
Современные операционные системы поддерживают как минимум домены следующих видов: AF_UNIX, AF_INET, AF_INET6. В качестве домена будем использовать AF_INET. Существует как минимум два вида сокетов: потоковые и датаграммные. Потоковые сокеты (SOCK_STREAM) предоставляют надежный, двунаправленный канал взаимодействия на основе байтового потока. Они используют протокол TCP/IP.

Для работы с сокетами существуют следующие системные вызовы:

  • socket() создает новый сокет;

  • bind() привязывает сокет к адресу. Обычно он используется сервером для привязки к общеизвестному адресу, известному клиентам;

  • listen() позволяет потоковому сокету принимать входящие соединения;

  • accept() принимает соединение от клиентского приложения в «слушающий» потоковый сокет и опционально возвращает адрес клиента;

  • connect() устанавливает соединение с другим сокетом.

Ввод/вывод через сокеты может быть выполнен с помощью традиционных операций read() и write() или же специальных системных вызовов (send(), recv(), sendto() и recvfrom()).

Важно помнить, что по умолчанию эти вызовы блокируют поток выполнения программы, если соответствующую операцию нельзя выполнить немедленно.

Потоковые сокеты часто делят на активные и пассивные. Активный сокет создается при помощи вызова socket(). Его можно использовать в вызове connect(), чтобы установить соединение с пассивным сокетом. Пассивным называется сокет, созданный в результате вызова listen(). Он принимает входящие соединения. Взаимодействие между клиентом и сервером можно проиллюстрировать при помощи следующей схемы.

Схема клиент серверного взаимодействия
Схема клиент серверного взаимодействия

Код сервера начинается с создания сокета, который затем привязывается к некому общеизвестному адресу вызовом bind(). Вызов listen() создает серверный сокет, принимающий входящие соединения при помощи accept(). Как только соединение установлено, сервер читает запрос от клиента, обрабатывает его и посылает ответ. Запрос клиента читается через сокет, возвращаемый accept. C этим сокетом связывается обработчик запросов клиента.

Клиент после создания сокета осуществляет соединение с сервером при помощи connect(). Далее он отправляет запросы вызовом write() и получает ответы при помощи read().

Создание TCP сервера в Unix подобных ОС

Организовать TCP-сервер можно как минимум двумя способами, которые различаются по эффективности и сложности реализации.

Первый способ состоит в порождении нового процесса для каждого запроса при помощи вызова fork(). Родительский процесс ожидает соединения через accept(). Как только соединение принято, вызывается функция fork(), создающая дочерний процесс, являющийся точной копией родительского, который и обрабатывает клиентские запросы.
Если дочерний процесс аварийно завершается, это не влияет на другие процессы, что является несомненным плюсом.
В качестве основного минуса следует отметить высокие накладные расходы на создание нового процесса. Также могут возникнуть трудности с масштабированием. Система может исчерпать ресурсы при большом количестве одновременных подключений.

Второй, более современный, способ заключается в запуске нового потока для обслуживания нового клиента при помощи pthread_create. Основной поток также ожидает соединения через accept(). При установлении соединения запускается новый поток (pthread_create()), который обрабатывает клиентские запросы.
Такой подход связан с меньшими накладными расходами. Создание потока гораздо быстрее и требует меньше ресурсов, чем создание процесса. Однако при большом количестве подключений все равно могут возникнуть проблемы с масштабированием. Обмен данными становится проще так как потоки разделяют общее адресное пространство. При этом могут возникнуть проблемы с синхронизацией. Ошибка в одном потоке может привести к аварийному завершению всего процесса.

Для повышения скорости обработки клиентских запросов могут применяться различные оптимизации.

Например, вместо создания нового потока или процесса для каждого клиента сервер во время запуска создает определенное количество потомков (потоков или процессов). Они формируют так называемый серверный пул. Каждый элемент пула обслуживает одного клиента за раз. Когда обслуживание окончено, потомок не завершается, а переходит к обслуживанию следующего клиента.

Еще одна явная оптимизация связана с тем, что значительная часть времени, потраченного на обработку клиентских запросов, связана с ожиданием данных. Сюда входит ожидание подключения клиента, ожидание ответа от БД, запросов внешним сервисам и т.д. Чтобы не занимать поток на время ожидания используются API мультиплексированного ввода-вывода. Такие API служит основой для применения сопрограмм, поэтому рассмотрим их подробнее.

API мультиплексированного ввода-вывода

Кратко рассмотрим 3 API: select, poll и epoll. Они используются для решения одной и той же задачи: мультиплексирования ввода-вывода. Позволяют программе эффективно отслеживать активность множества файловых дескрипторов (сокетов, файлов, каналов) и определять, какие из них готовы к чтению, записи или имеют ошибки, которые необходимо обработать.

Важной особенностью этих API является то, что они не блокируют выполнение программы. При этом они не занимается непосредственным вводом-выводом, а лишь сообщают о готовности файловых дескрипторов. Для выполнения чтения или записи нужно использовать дополнительные системные вызовы, такие как read, write.

Без использования select, poll или epoll, код, обрабатывающий множество клиентов, столкнулся бы с проблемой: если вызвать read() на сокете у которого нет информации для чтения, то поток, выполнивший вызов, заблокируется в ожидании данных и не сможет обслуживать другие активные сокеты.

Дальнейшее описание select, poll и epoll основано на материале, приведенном в книге Linux API. Исчерпывающее руководство. Автор Керриск Майкл.

Select

int select(int n, fd_set readfds, fd_set writefds, fd_set exceptfds, struct timeval timeout);
Позволяет программе отслеживать множество файловых дескрипторов, ожидая, пока один или несколько не станут готовыми к определённому классу операций ввода-вывода.
Отслеживаются три независимых набора дескрипторов.

Дескрипторы, перечисленные в readfds, будут отслеживаться на предмет доступности данных для чтения. fd_set — это буфер фиксированного размера. Дескрипторы в writefds отслеживаются на предмет доступности для записи (хотя запись большого объёма данных всё равно может привести к блокировке). Дескрипторы в exceptfds отслеживаются на предмет исключительных ситуаций.
Аргумент timeout указывает интервал, в течение которого функция select() должна блокироваться, ожидая готовности файлового дескриптора. Вызов будет блокироваться до тех пор, пока:
1. файловый дескриптор не станет готовым;
2. вызов не будет прерван обработчиком сигнала;
3. не истечёт время ожидания.

Каждый раз при вызове select() приходится копировать все три набора дескрипторов из пользовательского пространства в пространство ядра и обратно. Это довольно дорогая операция. Также select имеет жесткое ограничение на максимальное количество дескрипторов (обычно 1024), определяемое константой FD_SETSIZE.

Poll

int poll(struct pollfd *fds, nfds_t nfds, int timeout); Решает ту же задачу, что и select, но набор отслеживаемых файловых дескрипторов указывается в аргументе fds, который представляет собой массив структур следующего вида:

 struct pollfd 
 {
  int fd; /* файловый дескриптор*/ 
  short events; /* запрошенные события */ 
  short revents; /* возвращенные события */
 };

Вызывающий код должен указать количество элементов, содержащихся в fds, при помощи параметра nfds. Поле fd содержит файловый дескриптор. Поле events — это входной параметр, представляющий собой битовую маску. Она указывает события для файлового дескриптора fd, интересующие приложение. События задаются в виде отдельных битовых флагов или их комбинаций. Например, могут быть указаны POLLIN (запрос возможности чтения из сокета или канала), POLLOUT (запрос записи) и другие.

Поле revents — это выходной параметр, заполняемый ядром событиями, которые фактически произошли для заданного дескриптора. Биты, возвращаемые в revents, могут включать любые из указанных в events, или одно из значений POLLERR, POLLHUP, POLLNVAL, свидетельствующих об ошибке или закрытии дескриптора.
Аргумент timeout указывает количество миллисекунд, в течение которых poll() должен блокироваться в ожидании готовности дескриптора.

Poll использует один массив структур pollfd, который передается в ядро и обратно. Это эффективнее, чем три массива у select. К тому же нет жесткого ограничения на количество дескрипторов.
При этом все еще требуется сканирования всего списка дескрипторов при каждом вызове.
Для использования select или poll приложение должно передать в ядро полный список всех файловых дескрипторов, готовности которых оно ожидает. Ядро, в свою очередь, должно для каждого из переданных элементов проверить состояние дескрипторов и сформировать ответ, описывающий состояние каждого переданного дескриптора.


Проблемой select и poll является то, что процессорное время, которое уходит на выполнение этих вызовов, прямо пропорционально количеству отслеживаемых дескрипторов, что может привести к проблемам с масштабированием. Для проверки одного и того же набора выполняются циклические вызовы, но ядро не запоминает его содержимое. Поэтому массив дескрипторов нужно передавать снова и снова.

Epoll

Интерфейс epoll позволяет ядру записывать набор файловых дескрипторов, в которых заинтересован процесс. Это устраняет проблемы с масштабированием, присущие вызовам
select() и poll(), и обеспечивает производительность, зависящую не от количества
отслеживаемых дескрипторов, а от частоты событий ввода-вывода. Следовательно, интерфейс epoll являются более предпочтительным в ситуациях, когда дескрипторов слишком много.

В главе 59 книги Майкла Керриска приводятся сравнение времени затрачиваемого на 100 000 операций мониторинга с помощью poll(), select() и интерфейса epoll. При количестве
отслеживаемых дескрипторов равном 1000, select и poll тратят по 35 секунд, в то время как epoll - всего 0,53 секунды.

Программный интерфейс epoll поддерживается только в Linux версии 2.6 и выше. Главной структурой данных в этом интерфейсе является экземпляр epoll. Доступ к нему происходит через дескриптор открытого файла, который можно получить вызовом функции epoll_create1(). Когда необходимость в этом дескрипторе пропадает, его следует закрыть стандартным способом — с помощью вызова close(). Дескриптор не используется для ввода-вывода, а является ссылкой на структуру данных ядра, служащей двум целям:

  • запись списка файловых дескрипторов, в отслеживании которых заинтересован текущий процесс(список интереса);

  • предоставление списка файловых дескрипторов, готовых к вводу-выводу(список
    готовности).

Второй список дескрипторов является подмножеством первого. Системный вызов epoll_ctl() изменяет список интереса экземпляра epoll, на который ссылается файловый дескриптор epfd. Он имеет сигнатуру: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev).

epfd - дескриптор, возвращаемый вызовом epoll_create1(). Аргумент fd обозначает файловый дескриптор из списка интереса, параметры которого нужно изменить. Он может ссылаться на именованный канал, очередь FIFO, сокет и некоторые другие системные объекты.

Аргумент op обозначает операцию, которую нужно выполнить. Он может принимать одно из следующих значений:

  • EPOLL_CTL_ADD — добавляет файловый дескриптор fd в список интереса экземпляра epfd. Набор отслеживаемых событий для этого дескриптора задается в виде буфера, на какой указывает аргумент ev. При попытке добавления дескриптора, уже находящегося в списке интереса, вызов epoll_ctl() завершается ошибкой EEXIST;

  • EPOLL_CTL_MOD — изменяет параметры событий для дескриптора fd, используя информацию в буфере, на который указывает аргумент ev. Если попытаться изменить параметры дескриптора, не входящего в список интереса, то вызов epoll_ctl() завершится ошибкой ENOENT;

  • EPOLL_CTL_DEL — удаляет файловый дескриптор fd из списка интереса для экземпляра epfd. Аргумент ev при этом игнорируется. При попытке удалить файловый дескриптор, не входящий в список интереса, вызов epoll_ctl() завершится ошибкой ENOENT. Закрытие файлового дескриптора автоматически удаляет его из всех списков интереса epoll, в которые он входит.

Аргумент ev является указателем на структуру типа epoll_event следующего вида:

struct epoll_event 
{
  uint32_t events; /* События epoll (битовая маска) */
  epoll_data_t data; /* Пользовательские данные */
};

Поле data структуры epoll_event выглядит так:

typedef union epoll_data 
{
  void *ptr;    /* Указатель на пользовательские данные */
  int fd;       /* Файловый дескриптор */
  uint32_t u32; /* 32-разрядное целое число */
  uint64_t u64; /* 64-разрядное целое число */
} epoll_data_t;

Вложенное поле events представляет собой битовую маску, определяющую набор интересующих нас событий для дескриптора fd, подобную описанной для poll. Вложенное поле data является объединением, где один из элементов можно задействовать для описания информации, которая должна возвращаться вызывающему процессу в случае готовности дескриптора fd. Рассмотрим его подробнее в разделе с кодом.

Системный вызов epoll_wait() возвращает информацию о готовых файловых дескрипторах из экземпляра epoll, на который указывает дескриптор epfd. Его сигнатура имеет следующий вид: int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout).
За одно выполнение этот вызов может вернуть сведения о множестве готовых дескрипторов в виде массива структур epoll_event, на который указывает аргумент evlist.

Массив evlist выделяется вызывающим процессом, а количество содержащихся в нем элементов определяется с помощью аргумента maxevents. Каждый элемент массива evlist возвращает информацию об отдельном готовом файловом дескрипторе. Вложенное поле events содержит маску событий, произошедших с дескриптором. Во вложенном поле data хранится значение, которое было указано внутри ev.data при регистрации нашей заинтересованности в этом конкретном дескрипторе вызовом epoll_ctl(). Стоит отметить, что поле data всего лишь позволяет найти дескриптор, связанный с текущим событием.

Аргумент timeout определяет поведение вызова epoll_wait(), связанное с блокировкой. Аналогично poll.

Epoll позволяет работать в двух режимах:

  • по уровню — файловый дескриптор с событием возвращается, только если с момента последнего вызова epoll_wait произошли новые события (например, пришли новые данные);

  • по фронту — дескриптор возвращается, если есть непрочитанные или записанные данные.

Если приложение прочитало из файлового дескриптора только часть доступных для чтения данных, то при следующем вызове:

  • в режиме срабатывания по фронту файловый дескриптор не будет возвращён до тех пор, пока в дескрипторе не появятся новые данные;

  • в срабатывания по уровню дескриптор будет возвращаться до тех пор, пока не прочитаны все «старые» данные(и новые, если таковые придут).

Ввиду того что epoll обладают более высокой производительностью по сравнению с select и poll, будем рассматривать именно его в режиме срабатывания по фронту(edge-triggered).

В таком режиме epoll обычно применяется с неблокирующими файловыми дескрипторами, потому что уведомления приходят только при появлении новых событий и, как мне кажется, позволяет добиться более высокой производительности приложения.

Если файловый дескриптор блокирующий, то возможно зависание, так как программа ожидает, что данные будут доступны, но при этом не сможет многократно получить уведомления без явного чтения всех данных. Неблокирующий дескриптор позволяет при срабатывании обработать все доступные данные без блокировок, избегая потери событий.

Алгоритм использования интерфейса epoll имеет следующий вид:

  1. Сделать все дескрипторы, которые нужно отслеживать, неблокирующими;

  2. Сформировать список интереса epoll с помощью вызова epoll_ctl();

  3. Обрабатывать события ввода-вывода в цикле:

    3.1 Извлекать список готовых дескрипторов, используя epoll_wait();

    3.2 Выполнять ввод-вывод для каждого готового дескриптора, пока соответствующий системный вызов (например, read(), write(), recv(), send() или accept()) не вернет ошибку EAGAIN или EWOULDBLOCK.

Теперь давайте рассмотрим как реализовать опрос событий в коде на примере класса EPoller. Он позволяет для заданного неблокирующего файлового дескриптора создавать события принятия подключения клиента (AddAcceptEvent), чтения (AddReadEvent) и записи (AddWriteEvent). Также можно менять маску отслеживаемых событий дескриптора. Например, добавлять событие записи к дескриптору у которого отслеживается чтение (AppendWriteEvent). Можно удалять событие записи, связанное с дескриптором (RemoveWriteEvent). Это сделано для того чтобы epoll не генерировал событие готовности дескриптора на запись, когда они не нужны. Метод Poll позволяет получить массив событий, произошедших на отслеживаемых дескрипторах.

В конструкторе EPoller вызовом epoll_create1 создается новый экземпляр (объект ядра) epoll и возвращается файловый дескриптор epoll_descriptor, ссылающийся на этот экземпляр.

EPoller::EPoller()
{
    epoll_descriptor = epoll_create1(epoll_flags);
    if(epoll_descriptor ==  invalid_handle) 
    {
        throw std::system_error(errno, std::generic_category(), epoll_create_error);
    }
    events_hapenned.resize(max_epoll_events);
}

Создание события чтения происходит в функции CreateReadEvent.

epoll_event CreateReadEvent(int fd, uint32_t coro_id)
{
    struct epoll_event event;
    event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
    event.data.u64 =  PackIdFd(fd, coro_id);

    return event;
}

В нее передается дескриптор сокета fd, а также идентификатор сопрограммы coro_id. Эта информация нужна для того чтобы когда epoll вернет событие, произошедшее на данном дескрипторе, мы смогли определить какой сопрограмме его передать. CreateReadEvent упаковывает fd и coro_id в одну переменную event.data.u64 вызовом PackIdFd для последующей передачи в epoll. Переменная event.events инициализируется битовыми флагами, символизирующими интересующие нас события:

  • EPOLLIN - задает событие чтения из дескриптора;

  • EPOLLERR - указывает на то, что на отслеживаемом дескрипторе произошла ошибка;

  • EPOLLHUP - сигнализирует о том, что удаленная сторона (клиент в TCP-соединении) закрыла соединение;

  • EPOLLET - задает режим срабатывания epoll по фронту.

Задание флагов EPOLLERR, EPOLLHUP не является обязательным и использовано здесь для наглядности. Соответствующие события всегда приходят при возникновении, независимо от того, указаны они в events или нет.

Умея создавать событие чтения, можно написать функцию добавления его в очередь.

void EPoller::AddReadEvent(int fd, uint32_t coro_id)
{
    auto event = CreateReadEvent(fd, coro_id);
    AddEpollEvent(fd, event);
}

Функция AddEpollEvent добавляет событие в очередь, вызывая epoll_ctl. За добавление отвечает флаг EPOLL_CTL_ADD. Кроме него передается epoll_descriptor, а также дескриптор сокета, на котором отслеживаются события, и адрес структуры epoll_event.

void EPoller::AddEpollEvent(int fd, epoll_event& event)
{
    if(epoll_ctl(epoll_descriptor, EPOLL_CTL_ADD, fd, &event) < 0)
    {
        throw std::system_error(errno, std::generic_category(), epoll_add_desc_error);
    }
}

Необходимо иметь в виду, что если мы попытаемся вызвать epoll_ctl, повторно передав уже зарегистрированный в epoll дескриптор, то функция вернет -1 и переменная errno будет установлена в значение EEXIST.

Теперь рассмотрим вопрос стоит ли одновременно задавать флаги чтения(EPOLLIN) и записи(EPOLLOUT) для сокета? Я считаю, что хотя это и возможно, в серверных приложениях лучше использовать следующий подход:

  • Изначально указывать только EPOLLIN, так как сокет может быть готов к записи сразу после установки соединения. Если сразу добавить отслеживание EPOLLOUT, то вызов epoll_wait() может немедленно вернуть событие EPOLLOUT, что приведет к ненужному срабатыванию;

  • Отслеживать EPOLLOUT только при необходимости. Включать этот флаг только тогда имеются данные для отправки клиенту;

  • Как только данные отправлены, необходимо удалить флаг EPOLLOUT из мониторинга с помощью вызова epoll_ctl(EPOLL_CTL_MOD, ...), пока не появятся новые данные для отправки.

Такая динамическая модификация флагов позволит предотвратить сценарий, когда ядро постоянно сигнализирует о готовности к записи, но серверу нечего отправлять.

Для того чтобы добавить EPOLLOUT для дескриптора, который уже отслеживает чтение, необходимо создать событие, содержащее оба этих флага, например так:

epoll_event CreateReadWriteEvents(int fd, uint32_t coro_id)
{
    struct epoll_event event;
    event.events = EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
    event.data.u64 = PackIdFd(fd, coro_id);;

    return event;
}

После этого вызывать модификацию дескриптора в epoll:

void EPoller::ModifyEpollEvent(int fd, epoll_event& event)
{
    if (epoll_ctl(epoll_descriptor, EPOLL_CTL_MOD, fd, &event) < 0)
        throw std::system_error(errno, std::generic_category(), epoll_modify_desc_error); 
}

Метод ModifyEpollEvent вызывает ту же функцию epoll_ctl, но с флагом EPOLL_CTL_MOD, говорящем о необходимости поменять отслеживаемые события, связанные с дескриптором fd. Вместо вызова двух методов можно использовать один:

void EPoller::AppendWriteEvent(int fd, uint32_t coro_id)
{
    auto event = CreateReadWriteEvents(fd, coro_id);
    ModifyEpollEvent(fd, event);
}

Для удаление флага EPOLLOUT используется метод RemoveWriteEvent.

void EPoller::RemoveWriteEvent(int fd, uint32_t coro_id)
{
    auto event = CreateReadEvent(fd, coro_id);
    ModifyEpollEvent(fd, event);
}

Таким образом, нельзя просто так взять и сбросить отдельный флаг события. Нужно создать экземпляр epoll_event, оставить в нем только необходимые флаги, и передать epoll.

Наконец, рассмотрим метод Poll, осуществляющий опрос событий. Он использует функцию epoll_wait.

std::vector<PollResult> EPoller::Poll(int timeout_milliseconds)
{
    std::vector<PollResult> result;
    int nfds;
    if ((nfds = epoll_wait(epoll_descriptor, &events_hapenned[0], events_hapenned.size(), timeout_milliseconds)) < 0) 
    {
        if (errno == EINTR) 
        {
            return result;
        }
        throw std::system_error(errno, std::generic_category(), epoll_wait_error);
    }

    for (int i = 0; i < nfds; ++i) 
    {
        auto [coro_id, fd] = unPackIdFd(events_hapenned[i].data.u64);
        uint32_t event_flags = events_hapenned[i].events;

        //Проверка на ошибки или отключение
        if (event_flags & (EPOLLERR | EPOLLHUP)) 
        {
            // Ошибка на дескрипторе или удаленный конец закрыл соединение
            epoll_ctl(epoll_descriptor, EPOLL_CTL_DEL, fd, NULL); // Удаляем из epoll
            result.emplace_back(coro_id, DescriptorEvents::Error);
            continue; // Переходим к следующему событию
        }
        //Проверка на готовность к чтению (самое распространенное событие)
        if (event_flags & EPOLLIN) 
        {
            //Если это серверный сокет (слушающий), значит пришло новое соединение
            if (fd == accept_descriptor) 
            {
                //Принять новое соединение (и добавить его в epoll_fd)
                result.emplace_back(coro_id, DescriptorEvents::Accept);
            } 
            else 
            {
                //Это клиентский сокет, значит можно читать данные
                result.emplace_back(coro_id, DescriptorEvents::Read);
            }
        }
        else
        {
            //Проверка на готовность к записи
            if (event_flags & EPOLLOUT)
                //Cокет готов принимать исходящие данные
                result.emplace_back(coro_id, DescriptorEvents::Write);
        }
    }

    return result;
}

Epoll_wait принимает в качестве параметров дескриптор epoll, а также вектор events_hapenned, хранящий структуры epoll_event, размер вектора, а также величину timeout_milliseconds, задающую таймаут в миллисекундах в течение которого функция блокируется. В случае если epoll_wait возвращает значение меньше 0, системная переменная errno содержит причину ошибки. Иначе переменная nfds содержит количество событий, которыми заполнен events_hapenned.

Далее в цикле анализируем полученные события (переменная events_hapenned[i].events). В первую очередь, необходимо проверить наличие ошибок (флаги EPOLLERR | EPOLLHUP). Если они возникли, то удаляем дескриптор из множества отслеживаемых epoll вызовом epoll_ctl с флагом EPOLL_CTL_DEL. Далее создаем экземпляр структуры PollResult, который инициализируем идентификатором сопрограммы и флагом ошибки. Планировщик после вызова Poll создаст экземпляр исключения и передаст его сопрограмме, которая должна будет его обработать после возобновления.

Важно! Сопрограмма должна обработать исключение, иначе программа аварийно завершится. Исключение, возникшее внутри корутины, или присланное извне не распространяется за пределы стека сопрограммы в вызывающий код.

Если ошибок не возникло, то проверяем событие чтения. Если флаг EPOLLIN установлен на серверном сокете, это означает подключение клиента. Если сокет не серверный, то пришли данные от клиента.

Если нет флага чтения, то проверяем флаг записи. Если он установлен, то создаем соответсвующее событие.

Работа с сокетами в неблокирующем режиме

Для создания сокета используется класс NonBlockSocket.

NonBlockSocket::NonBlockSocket(int domain, int type)
{
	InitFileDescriptor(CreateSocket(domain, type));
}

Он создает сокет в конструкторе и переводит его в неблокирующий режим.

int NonBlockSocket::CreateSocket(int domain, int type)
{
    int s = socket(domain, type, 0);
    if (s == -1) {
        throw std::system_error(errno, std::generic_category(), "socket");
    }
   return SetupNonblockingMode(s);
}

Если сокет создается с ошибкой, то выбрасываем исключение.

int NonBlockSocket::SetupNonblockingMode(int s)
{
    int value = 1;
    socklen_t len = sizeof(value);
    
    if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (char*) &value, len) < 0) 
        throw std::system_error(errno, std::generic_category(), "setsockopt");    

    auto flags = fcntl(s, F_GETFL, 0);
    if (flags < 0) 
        throw std::system_error(errno, std::generic_category(), "fcntl error getting flags");

    if (fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) 
        throw std::system_error(errno, std::generic_category(), "fcntl error setting flag");

    return s;
}

Для сокета задаем SO_KEEPALIVE и добавляем флаг O_NONBLOCK. После этого системные вызовы, которые не могут завершиться немедленно, не блокируются, а возвращают ошибку.

От NonBlockSocket наследуется класс серверного сокета ServerNonBlockSocket. Его основные методы: Bind, Listen и Accept.

void ServerNonBlockSocket::Bind(SocketAddress address)
{
	address = std::move(address);
	auto [rawaddr, len] = address.RawAddr();
    int optval = 1;
    socklen_t optlen = sizeof(optval);
    if (setsockopt(*fd_ptr, SOL_SOCKET, SO_REUSEADDR, (char*) &optval, optlen) < 0) {
        throw std::system_error(errno, std::generic_category(), "setsockopt");
    }
    if (bind(*fd_ptr, rawaddr, len) < 0) {
        throw std::system_error(errno, std::generic_category(), "bind");
    }
}

В методе Bind задается флаг SO_REUSEADDR для разрешения повторного использования локального адреса и порта. Метод Listen просто вызывает одноименную функцию. Она переводит сокет в пассивный режим, ожидая подключение клиента к адресу и порту, которые были привязаны к нему ранее с помощью функции bind(). Также устанавливается очередь ожидающих соединений (backlog).

Метод AsyncAccept используется для приёма входящих соединений.

std::vector<NonBlockRWSocket> ServerNonBlockSocket::AsyncAccept(Task& coro)
{
	std::vector<NonBlockRWSocket> res;
	char clientaddr[sizeof(sockaddr_in)];
    socklen_t len = static_cast<socklen_t>(sizeof(sockaddr_in));
	bool hasData = true;
  
	coro.Yield();
    do
    {            
        //Вызываем accept() на неблокирующем сокете
        int conn_sock_fd = accept(*fd_ptr, reinterpret_cast<sockaddr*>(&clientaddr[0]), &len);
            
        if (conn_sock_fd == -1) 
		{
            if (errno == EAGAIN || errno == EWOULDBLOCK)
			{
                // Все ожидающие соединения приняты. Выходим из цикла accept().
				hasData = false;
                break; 
			}
            else
                throw std::system_error(errno, std::generic_category(), "async accept");
        }            
        //Успешно приняли новое соединение.
        //conn_sock_fd — это новый сокет для клиента.
		res.emplace_back(SocketAddress{reinterpret_cast<sockaddr*>(&clientaddr[0])}, conn_sock_fd);
	}while(hasData);

	return res;
}

Как видно из сигнатуры, метод принимает сопрограмму в качестве входного параметра. Так как момент подключения очередного клиента к серверу неизвестен, то планировщик может использовать это время для выполнения других сопрограмм. Поэтому в начале метода Listen корутина отдает управление планировщику вызовом Yield().

Когда клиент подключится, планировщик возобновит выполнение сопрограммы, которая в цикле будет вызывать accept. При этом возвращается дескриптор "активного" сокета, который используется исключительно для обмена данными с этим конкретным клиентом.

Так как сокет находится в неблокирующем режиме, то при исчерпании очереди клиентов вызов accept не блокируется, а завершится с ошибкой EAGAIN или EWOULDBLOCK. Это нормальная ситуация, свидетельствующая о том, что цикл надо завершить.

AsyncAccept возвращает результат в виде вектора объектов класса NonBlockRWSocket, служащего для обработки клиентских запросов.

Проблема обмена информацией по протоколу TCP

Наш сервер будет получать от клиента команду и отсылать ответ в виде строки. При обмене информацией с сервером по протоколу TCP мы не можем заранее сказать будут ли доставлены пересылаемые данные за один раз или разделены на несколько частей. Перед тем как принимать строку необходимо знать ее длину. Поэтому клиент вначале должен послать серверу длину строки в формате uint32_t размером 4 байт. Сервер прочитав это число будет знать когда завершать операцию чтения строки.

Для чтения length байт из сокета, находящегося в неблокирующем режиме, используем метод AsyncReadBytes.

ssize_t NonBlockRWSocket::AsyncReadBytes(Task& coro, void* buffer, size_t length) 
{   
    char* buf_ptr = static_cast<char*>(buffer);
    size_t total_received = 0;

    while(total_received < length) 
    {
        ssize_t bytes_recvd = read(*fd_ptr, buf_ptr + total_received, length - total_received);
        
        if(bytes_recvd <= 0)
        {
            if(bytes_recvd == 0)
                throw std::system_error(errno, std::generic_category(), conn_closed);

            if(errno != EAGAIN && errno != EWOULDBLOCK)
                throw std::system_error(errno, std::generic_category(), err_read_socket);
            else
                coro.Yield();
        }
        else
            total_received += bytes_recvd;
    }
    return total_received;
}

Принцип его работы состоит в том, чтобы в цикле вызывать функцию read. Вызов не блокируется, возвращаемое значение bytes_recvd может быть положительным, нулевым или отрицательным. Положительное значение говорит о том что от клиента пришли данные, которые мы накапливаем в буфере buf_ptr, используя смещение total_received.

Нулевое значение свидетельствует о том что клиент закрыл соединение. Эта ситуация обрабатывается генерацией исключения, которое должно быть обработано в сопрограмме - обработчике клиента.

Отрицательное значение свидетельствует об ошибке. При этом нужно проанализировать значение errno. Если оно EAGAIN или EWOULDBLOCK, то в сокете пока нет данных для чтения и сопрограмма может отдать управление планировщику вызова Yield. При другом коде ошибки выбрасываем исключение.

Чтение строки из сокета сводится к двум вызовам AsyncReadBytes.

std::string NonBlockRWSocket::AsyncRead(Task& coro)
{
    uint32_t length; 
    ssize_t bytes_read = AsyncReadBytes(coro, &length, sizeof(length));
   
    length = ntohl(length);
    //Теперь используем ту же функцию для чтения ровно length байт данных
    std::vector<char> buffer(length);
    AsyncReadBytes(coro, buffer.data(), length);

    return std::string(buffer.data(), length);
}

Первый считывает размер передаваемой клиентом строки, а второй - саму строку, которая затем возвращается вызывающему коду.

Для записи ответа в сокет используется метод AsyncWriteBytes.

void NonBlockRWSocket::AsyncWriteBytes(Task& coro, const void* buffer, size_t length) 
{   
    const char* buf_ptr = static_cast<const char*>(buffer);
    size_t total_send = 0;

    while(total_send < length) 
    {
        ssize_t bytes_sent = write(*fd_ptr, buf_ptr + total_send, length - total_send);
        
        if(bytes_sent < 0)
        {
            if(errno != EAGAIN && errno != EWOULDBLOCK)
                throw std::system_error(errno, std::generic_category(), err_write_socket);
            else
                coro.Yield();
        }
        else
            total_send += bytes_sent;
    }
}

Его принцип действия совпадает с AsyncReadBytes с той лишь разницей, что вместо вызова read() используется write().

Запись строки в клиентский сокет также осуществляется за два вызова AsyncWriteBytes. Первый отправляет длину строки, а второй - саму строку.

void NonBlockRWSocket::AsyncWrite(const std::string& content, Task& coro)
{
	if(content.empty())
		return;
    // Преобразование в сетевой порядок байтов
    uint32_t length = htonl(content.size());
    AsyncWriteBytes(coro, &length, sizeof(length)); 
    AsyncWriteBytes(coro, content.data(), content.size());
}

Теперь рассмотрим реализацию простого однопоточного TCP сервера с использованием корутин. Исходный код можно посмотреть здесь. Код серверной сопрограммы выглядит следующим образом.

try
  {
    scheduler->CreateTask([](Task& acceptTask)
    {
      try 
        {
    	    ServerNonBlockSocket sock;
	        sock.Bind(SocketAddress(address, port));
	        sock.Listen();
        
            poller->AddAcceptEvent(sock.GetFd(), acceptTask.GetId());
    	    while(true) 
            {
              for(auto&& clientsocket : sock.AsyncAccept(acceptTask))
              {
                scheduler->CreateTask([clientsock = std::move(clientsocket)](Task& client_coro) mutable
                {
                    client_handler(client_coro, clientsock);
                });
              }
            }
       }
       catch(const std::exception& e)
       {
           std::cout << e.what() << std::endl;        
       }
   });
   scheduler->RunTasks();
  } catch(const std::exception& e)
  {
      std::cout << e.what() << std::endl;
  }

Вначале создается неблокирующий серверный сокет, который привязывается к заданному адресу и порту. Далее он передается системе опроса событий вызовом AddAcceptEvent. После этого сокет переходит в режим прослушивания. Как только AsyncAccept вернет клиентский сокет, для него создается новая сопрограмма вызовом CreateTask. Логика обработки клиентского запроса реализована в функции client_handler.

void client_handler(Task& coro, NonBlockRWSocket& socket) 
{
    try 
    {
        poller->AddReadEvent(socket.GetFd(), coro.GetId());
        while(true)
        {
            std::string input = socket.AsyncRead(coro);
            if(input.empty())
                return;

            std::string result;

            if(startsWithSubstring(input, timecommand))
                result = GetTimeString();
            else
            {
                if(startsWithSubstring(input, echocommand))
                    result = ExtractStringAfterPrefix(input, echocommand);
                else
                    result = errorinput;
            }
            
            poller->AppendWriteEvent(socket.GetFd(), coro.GetId());
            socket.AsyncWrite(result, coro);
            poller->RemoveWriteEvent(socket.GetFd(), coro.GetId());
        }
    } catch (const std::exception& ex) 
    {
        std::cout << "Exception: " << ex.what() << std::endl;
    }
}

Клиентский сокет передается системе опроса (AddReadEvent) с целью ожидания прихода команды от клиента. Когда данные поступили, они считываются вызовом AsyncRead. Если пришла пустая строка, то клиент закрыл соединение и можно завершать сопрограмму. В противном случае обработчик пытается распознать команду клиента. В данный момент доступно две команды. Первая используется для получения текущего серверного времени. Для этого клиент должен передать строку time. Вторая - эхо ответ от сервера. Клиент должен передать строку в формате echo: hello world. Если клиент прислал строку, не совпадающую ни с одной из описанных команд, сервер отсылает ему сообщение об ошибке.

Заключение

  • Механизм опроса событий на основе epoll является более предпочтительным в плане производительности, чем select и poll;

  • Системные вызовы select() и poll() не являются эксклюзивными для Linux. Они доступны на большом количестве UNIX-подобных операционных систем, в то время как epoll можно использовать только в linux;

  • Насколько я понял, epoll не может эффективно использоваться при отслеживании событий чтения и записи для обычных дисковых файлов;

  • В других ОС есть аналогичные механизмы, например kqueue в MacOS и iocp в Windows;

  • В следующих статьях планируется рассмотреть вопросы задания тайм-аутов для событий, ожидание сигналов, отмены операций ввода-вывода и т.д.

Тема, затронутая в статье, довольна интересна и обширна. Возможно я что-то упустил или не совсем правильно объяснил. В таком случае прошу высказываться в комментариях.

Исходный код к статье можно посмотреть здесь. Продолжение следует...

Комментарии (0)