В прошлой статье был рассмотрен процесс создания без стековых сопрограмм на основе библиотеки прото потоков. При этом вся работа по хранению состояния сопрограммы полностью ложится на плечи ее создателя. Сегодня я хочу рассказать о том, как можно автоматизировать выполнение этой задачи при помощи стековых корутин, рассмотрю 3 способа передачи управления от одной сопрограммы к другой. Кроме того я опишу с  какими проблемами приходиться сталкиваться при создании стековых сопрограмм. Продолжим писать на языке Си и добавим ассемблер.

Для начала вспомним сопрограмму для вычисления чисел Фибоначчи, рассмотренную в предыдущей статье. Она не имела собственного стека, все используемые в коде переменные хранились в специально созданной структуре, передававшейся в корутину как входной параметр. Выполнение управлялось машиной состояний, созданной при помощи конструкции switch-case.

Я хочу избавиться от этой структуры и хранить состояние сопрограммы при помощи локальных переменных, как это делается в обычных функциях на языке программирования Си. Также хочу перейти от использования машины состояний к применению функций, похожих на обычные функции в языке программирования Си. Решить эти задачи можно несколькими способами. Перейдем к их рассмотрению.

Способ №1. Использование функций setjmp и longjmp

Функции setjmp и longjmp в Cи используются для выполнения нелокального перехода, который позволяет прервать выполнение функции, в том числе вложенной, и вернуться к определенной точке в другой функции. Setjmp сохраняет состояние выполнения программы в заданной точке в переменной типа jmp_buf, являющейся её единственным параметром. Longjmp восстанавливает это состояние, передавая управление обратно в точку вызова setjmp. Setjmp возвращает 0, если был выполнен прямой вызов.

Функция longjmp имеет два параметра. Первый - это состояние, сохраненное при помощи setjmp. Longjmp вызывается, когда нужно прервать выполнение текущей функции и вернуться к сохраненному состоянию. Программа продолжает выполнение с инструкции, следующей за вызовом setjmp. В случае возврата из longjmp значение, которое вернет setjmp, определяется вторым параметром longjmp. При этом значение 0 в качестве второго параметра longjmp передать нельзя, оно будет заменено на 1. 

Описание получилось довольно сложным. Для лучшего понимания работы этих функций давайте рассмотрим их простейшую реализацию. Так как речь идет о сохранении и восстановлении состояния регистров ЦП, то для выполнения этих действий мы вынуждены использовать ассемблер. При этом неизбежно попадаем в зависимость от архитектуры центрального процессора. Примеры к статье разработаны для архитектуры x86-64 (регистры 64 битные). Кроме того у нас также есть зависимость от ABI (двоичный интерфейс приложения), используемого в операционной системе в которой будет запускаться наша программа. 

Так как примеры к статье тестировались на ОС семейства Linux, то использовался System V ABI. ABI - это набор спецификаций, описывающих соглашения о вызовах функций, форматы объектных файлов, форматы исполняемых файлов и т.д. На сегодняшний день это стандартный ABI, используемый операционными системами Unix, Linux и другими. 

Кратко опишем передачу параметров в функции. Они передаются через регистры rdi, rsi, rdx, rcx, r8 и r9. При этом первый параметр, считая слева направо, передается через rdi, второй через rsi, третий через rdx и т.д. Параметры с плавающей точкой передаются через регистры xmm0–xmm7.

Функции вызываются с помощью инструкции call. Она помещает адрес команды, следующей за call, в стек и переходит к выполнению вызываемой функции. Этот адрес будем называть адресом возврата. Возврат к вызывающей функции осуществляется с помощью инструкции ret, которая извлекает адрес возврата из стека и продолжает выполнение программы с инструкции, расположенной по этому адресу. 

Возвращаемое значение вызываемой функции сохраняется в регистре rax, или, если это 128-битное значение, то младшие 64 бита хранятся в rax, а старшие 64 бита помещаются в rdx. Вызываемая функция должна сохранить регистры rbx, rsp, rbp, r12, r13, r14 и r15. 

Исходя из приведенного описания, подумаем как должен выглядеть аналог jmp_buf. Назовем его my_jmp_buf. Нам нужно сохранить 8 64-битных регистров rbx, rsp, rbp, r12 - r15, а также указатель команд rip. Поэтому my_jmp_buf - это массив из восьми элементов типа uint64_t. Далее рассмотрим ассемблерную реализацию функций my_setjmp и my_longjmp на  GNU-ассемблере (gas). Исходный код можно посмотреть здесь.

.file "jmp_functions.s"
.text
// int my_setjmp(my_jmp_buf env)
// Saves the context state to env and returns 0.
.globl my_setjmp
.type my_setjmp,@function

my_setjmp:
	movq %rbx,(%rdi)     /* Save rbx */
	movq %rbp,0x08(%rdi) /* Save rbp */
	movq %r12,0x10(%rdi) /* Save r12 */
	movq %r13,0x18(%rdi) /* Save r13 */
	movq %r14,0x20(%rdi) /* Save r14 */
	movq %r15,0x28(%rdi) /* Save r15 */
	leaq 8(%rsp),%rdx    /* Save rsp */
	movq %rdx,0x30(%rdi) 
	movq (%rsp),%rdx     /* Save return addr to buffer */
	movq %rdx,0x38(%rdi)
	xor %eax,%eax        /* store eax = 0 */
	ret

// void my_longjmp(my_jmp_buf env, int val)
// Restores the context state from env and transfers control, returning val
.globl my_longjmp
.type my_longjmp,@function

my_longjmp:
	movq (%rdi),%rbx         /* Restore rbx */
	movq 0x08(%rdi),%rbp	 /* Restore rbp */ 
	movq 0x10(%rdi),%r12	 /* Restore r12 */
	movq 0x18(%rdi),%r13	 /* Restore r13 */
	movq 0x20(%rdi),%r14	 /* Restore r14 */
	movq 0x28(%rdi),%r15	 /* Restore r15 */
  	movq 0x30(%rdi),%rsp     /* Restore rsp */
	
	movq %rsi, %rax         /* Set rax = val */
	testl %eax, %eax        /* Check If val == 0 */
	jnz to_exit             /* val != 0 go to_exit */
    incl %eax               /* Increment rax if it value equals to 0 */
to_exit:    
	jmpq *0x38(%rdi)        /* goto address */

Дадим некоторые пояснения по коду. Директива .text, говорит о том, что следующий за ней код является исполняемыми инструкциями программы. Директива .globl используется в ассемблере, чтобы объявить метку (например my_setjmp), определенную в текущем файле, видимой для компоновщика и доступной для других объектных файлов. Без этой директивы, символы считаются локальными по умолчанию и не видны за пределами того файла, в котором они определены.

Директива .type <some_name>, @function используется для того, чтобы передать компоновщику и отладчику информацию о том что some_name - это функция. По сути имя функции - это имя метки в ассемблере. Так как my_setjmp имеет один параметр - буфер для хранения состояния регистров, то его адрес передается через регистр rdi. Имена сохраняемых регистров предваряются символом %. Команда movq осуществляет копирование значения, хранящегося при помощи первого операнда (в my_setjmp первым операндом выступает регистр) во второй операнд, который представляет собой смещение в памяти для входного параметра типа буфер. Скобки вокруг %rdi говорят о том, что в данном регистре хранится начальный адрес буфера. Шестнадцатеричное значение перед скобкой указывает смещение в байтах относительно начала буфера. Так как регистры 64 битные, то каждое последующее смещение больше предыдущего на 8 байт. По сути, это похоже на обращение к элементам массива в программе на языке Си с той лишь разницей, что вместо индекса используется смещение в памяти от начала буфера.

В начале my_setjmp копирует значения 7 регистров rbx - r15 в буфер типа my_jmp_buf. Далее происходит сохранение регистра rip. Как говорилось ранее, перед вызовом my_setjmp адрес возврата помещается в стек. Этот адрес будет записан в регистр rip после возврата из функции. На адрес указывает регистр rsp. Его чтение происходит командой movq (%rsp), %rdx. Круглые скобки вокруг rsp говорят о том, что мы обращаемся не к значению, хранящемуся в rsp, а в память по адресу, записанному в этом регистре. Далее из rdx адрес записывается в буфер инструкцией movq %rdx, 0x38(%rdi). Две команды movq нужны потому что пересылать данные из памяти в память в архитектуре x86 нельзя. Далее происходит запись в rax 0, так как setjmp возвращает 0, и функция завершается.

Реализация my_longjmp представлена ниже и делает обратное - извлекает значения, сохраненные в буфере, и копирует их в соответствующие регистры. Так как функция имеет 2 параметра, то адрес первый из них, представляющий адрес буфера в памяти передается через регистр rdi, а второй - возвращаемое значение val в регистре rsi.

Это значение проверяется на 0 командой testl %eax, %eax и в случае равенства происходит увеличение eax на 1, так как longjmp не может вернуть 0 по описанию. Запись сохраненного ранее значения rip происходит командой jmpq *0x38(%rdi). Это косвенный переход по адресу. Адрес берется из буфера со смещением, указанным в команде. В архитектуре x86 непосредственно записывать в регистр rip нельзя.

Таким образом после завершения longjmp мы переходим на адрес, следующий за вызовом setjmp. Обычно по этому адресу расположен условный оператор, позволяющий определить был ли вызван setjmp непосредственно или управление вернулось после вызова longjmp. Теперь посмотрим как можно использовать эти функции для создания корутин, используя в качестве основы пример, взятый отсюда.

#include <stdio.h>
#include <stdlib.h>
#include "my_setjmp.h"

int max_iters, i, j;
my_jmp_buf   main_continuation;        
my_jmp_buf   coro1_continuation;
my_jmp_buf   coro2_continuation;    

void      coro1(void);
void      coro2(void);

void  main(int  argc, char* argv[])
{
     max_iters = 5;
     if (my_setjmp(main_continuation) == 0)
          coro1();
     if (my_setjmp(main_continuation) == 0)
          coro2();
     my_longjmp(coro1_continuation, 1);
}

void  coro1(void)
{
     if (my_setjmp(coro1_continuation) == 0) 
     {
          i = 1;
          my_longjmp(main_continuation, 1);
     }
     while (1) 
     {
          printf("Coro1 i: (%d), addr i: %p", i, &i);
          i++;
          if (my_setjmp(coro1_continuation) == 0)
               my_longjmp(coro2_continuation, 1);
     }
}

void  coro2(void)
{
     if (my_setjmp(coro2_continuation) == 0) 
     {
          j = 1;
          my_longjmp(main_continuation, 1);
     }
     while (1) 
     {
          printf("Coro2 j:(%d), addr j: %p\n", j, &j);
          j++;
          if (j > max_iters)
               exit(0);
          if (my_setjmp(coro2_continuation) == 0)
               my_longjmp(coro1_continuation, 1);
     }
}

Первый вызов функции my_setjmp в main сохраняет текущее состояние в main_continuation. Так как my_setjmp вызывается непосредственно, то она возвращает 0 и main вызывает coro1. Как только выполнение переходит в эту функцию, происходит сохранение состояния регистров ЦП как было описано выше в переменную coro1_continuation, инициализируется переменная i и управление возвращается в main при помощи my_longjmp. В main опять сохраняется состояние ЦП и осуществляется вызов coro2, логика которой та же, что и у coro1. coro2 возвращает управление main, которая осуществляет прыжок в coro1 вызовом my_longjmp(coro1_continuation, 1). В coro1 условие if (my_setjmp(coro1_continuation) == 0) не выполняется из-за того что мы переходим в сохраненное состояние вызвав my_longjmp. Начинается цикл while, в котором печатаем значение i, а также адрес, в памяти, занимаемый этой переменной и передаем управление в coro2 вызовом my_longjmp(coro2_continuation, 1).

В coro2 также начинает выполняться цикл while. Его логика аналогична циклу в coro1. После проверки условия завершения программы осуществляется прыжок в coro1. Описанные действия повторяются пока не будет достигнуто заданное количество итераций.

Этот пример иллюстрирует тот факт, что можно писать корутины не прибегая к switch-case. Вместо них можно использовать setjmp/longjmp. Если запустить эту программу, то получим следующий вывод:

Coro1 i: (1), addr i: 0x55f27a9f50e0 - Coro2 j:(1), addr j: 0x55f27a9f5040
Coro1 i: (2), addr i: 0x55f27a9f50e0 - Coro2 j:(2), addr j: 0x55f27a9f5040
Coro1 i: (3), addr i: 0x55f27a9f50e0 - Coro2 j:(3), addr j: 0x55f27a9f5040
Coro1 i: (4), addr i: 0x55f27a9f50e0 - Coro2 j:(4), addr j: 0x55f27a9f5040
Coro1 i: (5), addr i: 0x55f27a9f50e0 - Coro2 j:(5), addr j: 0x55f27a9f5040

Как видно глобальные переменные i и j последовательно увеличивают свое значение с 1 до 5. Однако, мы хотим избавиться от глобальных переменных при написании сопрограмм. Что будет, если сделать i локальной переменной в coro1, а j - локальной в coro2? Внеся изменения и снова запустив программу получим неожиданный результат:

Coro1 i: (1), addr i: 0x7ffe84f9ef74 - Coro2 j:(2), addr j: 0x7ffe84f9ef74
Coro1 i: (3), addr i: 0x7ffe84f9ef74 - Coro2 j:(4), addr j: 0x7ffe84f9ef74
Coro1 i: (5), addr i: 0x7ffe84f9ef74 - Coro2 j:(6), addr j: 0x7ffe84f9ef74

Во-первых, верный порядок увеличения значений i и j нарушен, а во-вторых обе переменные имеют один и тот же адрес памяти. В чем же дело?

Проблема заключается в выделении памяти в стеке. Когда начинается выполнение функции main и до того, как будет вызван первый my_setjmp, в стеке находится только один кадр функции main. Затем вызывается coro1, и в стеке появляется еще один кадр. coro1 использует my_longjmp для возврата управления в основную программу, фактически восстанавливая стек до точки вызова первого my_setjmp. Когда main вызывает my_setjmp повторно, а затем еще и coro2, кадр стека для функции coro2, скорее всего, займет то же место, которое ранее было выделено для coro1! Поскольку coro1 и coro2 имеют только одну локальную переменную, она, весьма вероятно, будет использовать одно и то же место в стеке. Точнее, весьма вероятно, что переменные i из coro1 и j из coro2 будут иметь один и тот же адрес, что и иллюстрируется выводом программы. Поэтому, когда coro1 выводит и увеличивает значение i, coro2 теряет свое исходное значение j и выводит следующее значение i.

Чтобы решить эту проблему, необходимо выделить отдельное пространство стека для каждой сопрограммы. Кроме того хотелось бы очистить исходный код наших корутин от вызовов my_setjmp/my_longjmp так как их применение ухудшает читабельность кода и, на мой взгляд, не является хорошим стилем программирования.

Для решения описанных задач предлагается создать планировщик, который будет создавать сопрограммы, планировать их выполнение, а также переключаться между ними, используя my_setjmp и my_longjmp. Такой подход уже был описан на Хабре в этой статье. Возьмем изложенные в ней идеи за основу. Также логика работы функций init_task, choose_task, task_yield, приведенных далее, во многом повторяет ту, что используют scheduler_create_task, scheduler_choose_task и scheduler_relinquish, описанные в статье.

Для начала создадим структуру, описывающую сопрограмму.

enum task_status 
{
	TASK_CREATED,
	TASK_RUNNING,
	TASK_WAITING,
	TASK_FINISHED,
};

typedef void (*task_func)(void*);

struct task 
{
	enum task_status status;
	int id;
#if defined(USE_SETJMP)
	sjlj_continuation continuation;
	void *stack_top;
#elif defined(USE_CONTEXT)
	context_continuation continuation;
#elif defined(USE_ASMCONTEXT)
	struct asm_context_continuation continuation;
#endif
	void *stack;
 	task_func func;
	void *arg;
#if defined(CORO_USE_VALGRIND)
  	int valgrind_id;
#endif
};

Сопрограмма может находиться в одном из следующих состояний, хранящемся в переменной status: создана, но не запущена (TASK_CREATED), запущена на выполнение (TASK_RUNNING), не выполняется, ожидая наступления некоторого события (TASK_WAITING) и завершена (TASK_FINISHED). Кроме того для удобства отладки каждая сопрограмма имеет целочисленный номер, хранимый в id.

Также у корутины есть указатель stack, хранящий адрес области со стеком и указатель task_func на функцию сопрограммы, принимающую единственный аргумент arg. Через него можно передавать входные данные, упаковав их в структуру. Да-да, без структуры и здесь мы пока не обойдемся, но хранить она будет входные параметры, а не локальные переменные.

Кроме того каждая сопрограмма содержит переменную continuation, которая хранит ее внутреннее состояние. На мой взгляд, это состояние можно отнести к неограниченным продолжениям, о которых упоминалось в предыдущей статье. Согласно Википедии, такие продолжения действительно представляют собой состояние всей программы (или одной её нити) в определённый момент.

Также в описаниях функций, с помощью которых можно реализовывать сопрограммы, можно встретить другое понятие - контекст задачи. Он определяется как минимальный набор данных, используемых задачей, который необходимо сохранить, чтобы задача могла быть прервана, а затем восстановлена с того же места. В данной статье продолжение и контекст будут использоваться как слова синонимы.

В статье описывается 3 способа организации продолжения. Конкретный способ задается при помощи одного из макросов USE_SETJMP, USE_CONTEXT или USE_ASMCONTEXT. Сейчас предполагаем что определен USE_SETJMP. В этом случае continuation представляет собой массив my_jmp_buf для хранения состояния регистров ЦП как было описано выше. Назначение переменной valgrind_id рассмотрим позже.

Для планирования выполнения сопрограмм используется двух связный список, каждый элемент которого хранит указатели на предыдущий элемент, сопрограмму и последующий элемент списка:

struct task_list_item 
{
	struct task_list_item *next;
	struct task *task;
	struct task_list_item *prev;
};

Планировщик описывается следующей структурой:

struct scheduler_data
{
	#if defined(USE_SETJMP)
		sjlj_continuation continuation;
	#elif defined(USE_CONTEXT)
		context_continuation continuation;
	#elif defined(USE_ASMCONTEXT)
		struct asm_context_continuation continuation;
	#endif

 	struct task_list_item *head; 	
 	struct task_list_item *current;
 	struct task_list_item *tail;
};

В ней также хранится экземпляр продолжения, которое используется для возврата к планировщику. Тип продолжения, как и в структуре task, зависит от способа переключения сопрограмм. Кроме того, планировщик хранит указатели на голову, текущий элемент и хвост списка планирования корутин.

Создание сопрограммы происходит в функции create_task.

void create_task(void (*func)(void *), void *arg)
{
	struct task* task = init_task(func, arg);
	
	init_task_stack(task, default_stack_size);
	
	struct task_list_item *list_item = init_list_item(task);
	insert_task_list_item_tail(&scheduler_data.head, list_item, &scheduler_data.tail);
}

Вначале происходит инициализация сопрограммы через init_task. Далее выделяется память под стек путем вызова init_task_stack. После этого создается экземпляр task_list_item, который затем вставляется в список для планирования вызовом insert_task_list_item_tail. Код для работы с этим списком тривиален, его, при желании, можно посмотреть в репозитории.

В функции init_task выделяется память под сопрограмму. После этого корутина переходит в состояние TASK_CREATED, происходит сохранение указателей на функцию, отвечающую за логику работы сопрограммы, и её аргумент, а также задается порядковый номер.

struct task* init_task(void (*func)(void *), void *arg)
{
	static int id = 1;
	struct task* task = malloc(sizeof(*task));
	if(!task)
		return NULL;

	task->status = TASK_CREATED;
	task->func = func;
	task->arg = arg;
	task->id = id++;

	return task;
}

Функция choose_task отвечает за выбор текущей сопрограммы для выполнения из списка планирования.

struct task *choose_task(void)
{
	scheduler_data.current = scheduler_data.head;
	while(scheduler_data.current)
	{
		if((scheduler_data.current->task->status == TASK_RUNNING) || (scheduler_data.current->task->status == TASK_CREATED)) 
		{
			scheduler_data.current = remove_task_list_item_head(&scheduler_data.head, &scheduler_data.tail);
			insert_task_list_item_tail(&scheduler_data.head, scheduler_data.current, &scheduler_data.tail);

			return scheduler_data.current->task;
		}
	}
	return NULL;
}

Для этого в качестве текущего узла вначале выбирается головной элемент списка. Если он не нулевой, то в цикле смотрим состояние сопрограммы, хранящейся в текущем узле списка. Если сопрограмма в состоянии создана или запущена, то соответствующий элемент удаляется из головы списка и помещается в его хвост. После этого вызывающему коду передается указатель на сопрограмму, хранящуюся в данном элементе списка.

Теперь рассмотрим как планировщик работает на практике. Для этого опять напишем программ для вычисления последовательностей чисел Фибоначчи. Для иллюстрации работы планировщика по переключению между корутинами будем вычислять две последовательности. У каждой в качестве параметров задается количество элементов и номер последовательности при помощи структуры task_params:

struct task_params 
{
	unsigned  num_iters;
	unsigned  task_id;
};

Функция вычисления чисел Фибоначчи выглядит следующим образом:

void fibonacci(void* arg)
{
	unsigned long  Fn, Fn_2 = 0, Fn_1 = 1;
	int i = 0;

	struct task_params *tsk = (struct task_params *)arg;
	for (i = 0; i < tsk->num_iters; i++) 
	{
		if(i == 0)
			Fn = Fn_2;
		else
			if(i == 1) 
				Fn = Fn_1;
			else
			{
				Fn = Fn_2+Fn_1;
				Fn_2 = Fn_1;
				Fn_1 = Fn;
			}
		printf("task %d: %d-th Fibonacci number is: %ld\n", tsk->task_id, i+1, Fn);
		task_yield();
	}
}

Я думаю, вы уже заметили вызов task_yield, который приостанавливает работу функции fibonacci и передает управление планировщику.

И, наконец, главная функция программы:

struct task_params tsk1, tsk2;

int main(int argc, char **argv)
{
	tsk1.task_id = 1;
	tsk2.task_id = 2;
	
	tsk1.num_iters = 3;
	tsk2.num_iters = 7;

	init_scheduler();

	create_task(fibonacci, (void*)&tsk1);
	create_task(fibonacci, (void*)&tsk2);

	run_tasks();
	return EXIT_SUCCESS;
}

Здесь всё тривиально. Вначале задаются входные параметры для сопрограмм. Далее вызовом init_scheduler инициализируется пустой список задач планировщика. После этого при помощи create_task создаются 2 сопрограммы и добавляются в список планирования. Run_tasks запускает планировщик, который работает до тех пор, пока не будут выполнены все задачи из списка. Рассмотрим функцию run_tasks.

void run_tasks(void)
{
	my_setjmp(scheduler_data.continuation);
	struct task *curr = NULL;

	if(scheduler_data.current && (scheduler_data.current->task->status == TASK_FINISHED))
	{
		remove_task_tail(&scheduler_data.head,scheduler_data.current, &scheduler_data.tail);
		scheduler_free_current_task();
	}
	while(curr = choose_task())
	{
		schedule_task(curr);
	}
	printf("Finished run_tasks!\n");
}

Данная функция играет роль основного цикла для планировщика. Она определяет какая сопрограмма должна выполняться следующей и передает ей управление. Также run_tasks должна получать управление когда в коде сопрограммы вызывается task_yield. Для того чтобы задать точку возврата, используется вызов my_setjmp аргументом которого служит продолжение, содержащее состояние планировщика. После этого проверяется не завершилась ли текущая сопрограмма. Если она завершилась, то её нужно исключить из списка планирования вызовом remove_task_tail и освободить память при помощи scheduler_free_current_task.

После этого вызовом choose_task из очереди планирования выбирается очередная задача. Она в зависимости от своего состояния (поле status в структуре task) начинает либо продолжает выполняться при помощи функции schedule_task, работу которой рассмотрим далее.

void schedule_task(struct task *next)
{
	if (!next) 
	{
		printf("Invalid task in scheduler! \n");
		return;
	}

	if (next->status == TASK_CREATED) 
	{
		register void *top = next->stack_top;
		__asm__ volatile(
			"movq %[rs], %%rsp \n"
			: [ rs ] "+r" (top) ::
		);

		next->status = TASK_RUNNING;
		next->func(next->arg);
		next->status = TASK_FINISHED;

		my_longjmp(scheduler_data.continuation, long_jmp_value);
	} 
	else
		if (next->status == TASK_RUNNING) 
			my_longjmp(next->continuation, long_jmp_value);
}

Если задача находится в состоянии TASK_CREATED (создана, но не запущена), то перед ее запуском необходимо модифицировать регистр rsp таким образом, чтобы он хранил адрес вершины стека (stack_top) для сопрограммы. Стек адресуется при помощи регистра rsp. Стек в архитектуре x86_64 растет вниз. Изменение значения rsp происходит при помощи ассемблерной вставки. Спецификатор volatile нужен для того чтобы компилятор не пытался оптимизировать ассемблерный код. Адрес, хранящийся в переменной top передается в регистр. Какой именно это будет регистр, насколько я понимаю, определяет компилятор. Далее содержимое этого регистра копируется в rsp.

Теперь задача готова к запуску. Все последующие вызовы my_setjmp/my_longjmp для данной сопрограммы будут сохранять и восстанавливать состояние регистров ЦП, используя указанный блок памяти. В ней также будут храниться локальные переменные корутины. После установки стека меняем статус сопрограммы и запускаем ее на выполнение в первый раз, вызывая функцию, переданную в корутину, с аргументом.

Далее сопрограмма работает пока не вызовет task_yield, либо не завершится, достигнув конца функции. Во втором случае ее состояние будет изменено с TASK_RUNNING на TASK_FINISHED. После этого управление вызовом my_longjmp будет передано функции run_tasks планировщика, который удалит сопрограмму из очереди планирования и очистит занимаемую ею память.

Рассмотрим функцию task_yield, которая вызывается сопрограммой.

void task_yield(void)
{
	if (my_setjmp(scheduler_data.current->task->continuation)) 
		return;
	else 
		my_longjmp(scheduler_data.continuation, long_jmp_value);
}

При первом вызове task_yield сопрограмма уже находится в состоянии TASK_RUNNING. В этом вызове my_setjmp возвращает 0, корутина приостанавливает свое выполнение и с помощью my_longjmp переключается на метод run_tasks планировщика, который извлекает очередную задачу из очереди и запускает её. Когда очередь снова доходит до сопрограммы, вызвавшей task_yield, она передается в schedule_task, и ей возвращается управление вызовом my_longjmp(next->continuation, long_jmp_value). В результате осуществляется переход в task_yield. Так как он произошел из my_longjmp, то возвращается ненулевой результат и выполняется условие if (my_setjmp(scheduler_data.current->task->continuation)). В результате task_yield завершается и выполнение приостановленной корутины продолжается с кода, следующего за task_yield. Описанные действия повторяются до тех пор, пока не завершится основная функция корутины. После этого сопрограмма перейдет в TASK_FINISHED и управление будет передано в run_tasks вызовом my_longjmp.

Функции планировщика, логика которых отличается в зависимости от макросов USE_SETJMP, USE_CONTEXT, USE_ASMCONTEXT, вынесена в репозитории в отдельные папки. Для USE_SETJMP код расположен здесь.

Запустив программу, получаем следующий результат:

task 1: 1-th Fibonacci number is: 0
task 2: 1-th Fibonacci number is: 0
task 1: 2-th Fibonacci number is: 1
task 2: 2-th Fibonacci number is: 1
task 1: 3-th Fibonacci number is: 1
task 2: 3-th Fibonacci number is: 1
task 2: 4-th Fibonacci number is: 2
task 2: 5-th Fibonacci number is: 3
task 2: 6-th Fibonacci number is: 5
task 2: 7-th Fibonacci number is: 8
Finished run_tasks!

Чередующийся вывод результатов task 1 и task 2 свидетельствует о том, что выполнение корутин прерывается планировщиком, как это было описано выше.

Таким образом мы достигли целей, заявленных в начале статьи:

  1. Больше не нужно вручную организовывать хранение локальных переменных и состояния для продолжения сопрограммы в отдельной структуре. Эти задачи решаются автоматически при помощи стека.

  2. В код корутины больше не нужно встраивать конечный автомат при помощи макросов, что значительно повышает его читаемость и способствует уменьшению вероятности ошибок.

Однако, за всё нужно платить. И как часто бывает в процессе разработки, решая одни проблемы, мы создаем другие. О чём это я? А вот о чём:

  1. При выделении блока памяти для сопрограммы неизбежно возникает вопрос о его размере. Ведь в этом блоке корутина не только хранит свое состояние. Она также может вызывать другие функции, в том числе рекурсивные, которые будут хранить свои локальные переменные и адрес возврата в том же блоке. Это может привести к непредсказуемому поведению программы в случае если стек перерастет размер выделенной памяти. Самое легкое решение проблемы - выделение памяти "с запасом", но величину этого запаса далеко не всегда легко подсчитать. Скорее всего, придется выделять больше памяти, чем реально необходимо. Таким образом, описанные в предыдущей статье без стековые корутины потребляют значительно меньше памяти чем стековые. В общем вопрос о выделении достаточного объема памяти довольно сложный и его рассмотрение заслуживает отдельной статьи.

  2. Манипулируя регистрами процессора для установки стека, мы лишаемся кроссплатформенности и попадаем в зависимость не только от архитектуры процессора, но и от ABI.

  3. Инструментальным средствам выявления ошибок при работе с памятью типа Valgrind не нравятся манипуляции со стеком, что порождает поток предупреждений следующего вида:

    ==22277== Warning: client switching stacks? SP change: 0x1ffefffc10 --> 0x4a6a0f0
    ==22277== to suppress, use: --max-stackframe=137344146208 or greater
    task 1: 1-th Fibonacci number is: 0
    ==22277== Warning: client switching stacks? SP change: 0x4a6a088 --> 0x1ffefffc40
    ==22277== to suppress, use: --max-stackframe=137344146360 or greater
    ==22277== Warning: client switching stacks? SP change: 0x1ffefffc10 --> 0x4a6e240
    ==22277== to suppress, use: --max-stackframe=137344129488 or greater
    ==22277== further instances of this message will not be shown.

  4. Использование функций setjmp/longjmp, также как и goto, не является рекомендуемой практикой программирования.

Для решения проблемы 3 можно воспользоваться API по отслеживанию ресурсов памяти, который предоставляется Valgrind. Помните переменную valgrind_id в объявлении структуры task? Это идентификатор стека, возвращаемый вызовом VALGRIND_STACK_REGISTER(start, end) и говорящий Valgrind о том, что адрес памяти, начинающийся с адреса start и заканчивающийся адресом end, является стеком. Этот вызов вставляется в код при выделении памяти под стек в функции init_task_stack. Использование VALGRIND_STACK_REGISTER позволяет избавиться от сообщений о переключении стека. При освобождении памяти в функции delete_task_list_item нужно сказать Valgrind, что адреса, связанные с идентификатором, больше стеком не является. Делается это вызовом VALGRIND_STACK_DEREGISTER(valgrind_id).

Что же делать если я не хочу использовать setjmp/longjmp? Можно переключаться между сопрограммами при помощи "контекстных" функций, о которых поговорим далее.

Способ №2. Использование функций getcontext, makecontext и swapcontext

Согласно Википедии, эти функции определены в стандарте POSIX.1-2001 и используются для управления контекстом. В статье понятия контекст и продолжение будем считать синонимами. Контексты позволяют создавать нескольких взаимодействующих потоков управления.

Функции для работы с контекстами и связанные с ними типы данных определены в заголовочном файле ucontext.h. Основным типом данных является структура типа ucontext_t, некоторые члены которой представлены ниже:

 typedef struct ucontext 
 {
   struct ucontext *uc_link;
   sigset_t uc_sigmask;
   stack_t uc_stack;
   mcontext_t uc_mcontext;
...
 } ucontext_t;

uc_link указывает на контекст, который будет восстановлен при выходе из текущего, если контекст создан с помощью makecontext. uc_sigmask используется для хранения сигналов, заблокированных в контексте, а uc_stack является стеком, используемым контекстом. uc_mcontext используется для хранения состояния исполнения, включая все регистры центрального процессора, счётчик команд и указатель стека.

Кратко опишем назначение функций работы с контекстом:

  1. int getcontext(ucontext_t *ucp) сохраняет текущий контекст в ucp. При возникновении ошибки возвращает -1.

  2. void makecontext(ucontext_t ucp, void func(), int argc, ...) устанавливает альтернативный поток управления в ucp, предварительно инициализированный с помощью getcontext. Поле ucp.uc_stack должно указывать на место для стека необходимого размера. При совершении переключения в ucp с помощью swapcontext исполнение начинается с точки входа в функцию func с числом аргументов argc. При завершении func управление передаётся ucp.uc_link. В func можно передавать аргументы. При этом в параметр argc нужно передать их количество. После этого перечислить аргументы через запятую. Если параметров нет, то argc = 0.

  3. int swapcontext(ucontext_t oucp, ucontext_t ucp) - передаёт управление ucp и сохраняет текущее состояние выполнения в oucp. Если возникла ошибка, то возвращает -1. В случае успешной передачи управления ничего не возвращается.

Рассмотрим как использовать эти функции в планировщике. Код можно посмотреть здесь. Начнем с инициализации сопрограммы.

void init_task_stack(struct task* task, int stack_size)
{
	task->stack = malloc(stack_size);
    if(!task->stack)
	{
		printf(memory_alloc_error_msg);
        exit(EXIT_FAILURE);
	}

	if(getcontext(&task->continuation) == -1)
		handle_context_error("getcontext");

	task->continuation.uc_stack.ss_sp = task->stack;
    task->continuation.uc_stack.ss_size = stack_size;
  	task->continuation.uc_link = &scheduler_data.continuation;

	makecontext(&task->continuation, (void (*)()) task_call, 0);
}

После выделения памяти под сопрограмму происходит сохранение контекста вызовом getcontext в переменную task->continuation, имеющую тип ucontext_t. Далее инициализируем переменную task->continuation.uc_stack.ss_sp указателем на выделенную ранее под стек область памяти. Переменная task->continuation.uc_stack.ss_size хранит размер области памяти под стек, а task->continuation.uc_link содержит указатель на контекст, который получит управление после завершения текущего. Далее вызовом makecontext осуществляется привязка контекста task->continuation к функции task_call.

void task_call(void)
{
	scheduler_data.current->task->func(scheduler_data.current->task->arg);
	scheduler_data.current->task->status = TASK_FINISHED;
}

Она запускает функцию корутины и по ее завершению меняет статус сопрограммы на TASK_FINISHED аналогично тому, как это делалось в функции schedule_task, рассмотренной ранее.

Теперь посмотрим на функцию task_yield.

void task_yield(void)
{
	if (swapcontext(&scheduler_data.current->task->continuation, &scheduler_data.continuation) == -1)
    	handle_context_error("swapcontext");
}

Напомню что эта функция вызывается из сопрограммы и передает управление планировщику. В данном случае передача управления происходит от контекста задачи scheduler_data.current->task->continuation в контекст планировщика, хранящийся в переменной scheduler_data.continuation. При этом текущее состояние выполнения сопрограммы сохраняется в scheduler_data.current->task->continuation.

И, наконец, рассмотрим запуск сопрограмм в планировщике.

void run_tasks(void)
{
	struct task *next = NULL;

	while(next = choose_task())
	{
		if (swapcontext(&scheduler_data.continuation, &next->continuation) == -1)
            handle_context_error("swapcontext");
		
		if(next->status == TASK_FINISHED)
			scheduler_free_current_task();
	}
	printf("Finished run_tasks!\n");
}

Процесс запуска корутин значительно упростился. Так как нам больше не нужно вручную работать с регистрами ЦП, то отпадает необходимость различать состояния TASK_CREATED и TASK_RUNNING. Описанная ранее функция schedule_task, также больше не нужна.

Теперь мы в цикле получает указатель на следующую сопрограмму и переключаемся на нее вызовом swapcontext. При этом в переменной scheduler_data.continuation сохраняется адрес той команды в цикле while, которая будет выполнена после передачи управления из корутины в планировщик. В результате переключения контекста начнет выполняться функция task_call, которая вызовет функцию корутины. Она будет выполняться пока не вернет управление планировщику вызовом task_yield. Будет проверено состояние сопрограммы. Если она завершена, то происходит её удаление из списка планирования вызовом scheduler_free_current_task и описанные действия повторяются.

Необходимо акцентировать внимание на том, что после завершения работы task_call управление возвращается в функцию run_tasks. Так происходит потому что в функции init_task_stack мы инициализировали task->continuation.uc_link адресом scheduler_data.continuation, указав, что контекст планировщика будет выполняться после завершения контекста task->continuation.

Применение контекстных функций связано со следующими сложностями:

  1. Никуда не делась проблема определения объема памяти, необходимого для корректной работы сопрограмм.

  2. Хотя контекстные функции и являлись частью стандарта POSIX.1-2001, они не поддерживались всеми Unix подобными ОС. В POSIX.1-2004 эти функции были объявлены устаревшими и убраны в POSIX.1-2008. Однако, в некоторых ОС, таких, как Ubuntu, ими все еще можно пользоваться.

Что же делать, если я не хочу пользоваться контекстными функциями так же, как и setjmp/longjmp? Можно вручную реализовать функцию передачи управления между корутинами на ассемблере или воспользоваться существующими библиотеками.

Способ №3. Использование ассемблерной функции переключения контекста

В данном случае мы в некотором смысле комбинируем оба рассмотренных выше подхода. С одной стороны опять возникает необходимость устанавливать стек вручную как это было при использовании my_setjmp/my_longjmp. С другой стороны нужно задавать функции перехода как в примере с makecontext. Естественно опять не обойтись без ассемблера. Для того чтобы этот способ работал на различных ЦП необходима отдельная реализация для каждой процессорной архитектуры и ABI. Для того чтобы оценить масштаб работ можно посмотреть директорию asm репозитория библиотеки Boost Context. Мы рассмотрим простейшую реализацию для x86_64 с использованием System V ABI. Исходный код можно посмотреть здесь.

Считаем что определен макрос USE_ASMCONTEXT. Тогда переменная task->continuation имеет тип asm_context_continuation.

struct asm_context_continuation
{
	uint64_t rsp;
	uint64_t r15;
	uint64_t r14;
	uint64_t r13;
	uint64_t r12;
	uint64_t rbx;
	uint64_t rbp;

};

По сути это та же структура, содержащая регистры, которую мы рассматривали в примере с setjmp/longjmp. Отличие состоит в том, что мы не храним в ней содержимое регистра указателя команд rip.

Рассмотрим функцию инициализации стека сопрограммы.

void init_task_stack(struct task* task, int stack_size)
{
	task->stack = malloc(stack_size);
    if(!task->stack)
        exit_with_message(memory_alloc_error_msg);

	*(uint64_t *)(task->stack + stack_size -  8) = (uint64_t)task_yield;
	*(uint64_t *)(task->stack + stack_size - 16) = (uint64_t)task_call;
	task->continuation.rsp = (uint64_t)(task->stack + stack_size - 16);
}

В начале функции происходит выделения памяти под стек. Так как мы не храним значение rip в структуре task->continuation, то необходимо позаботиться о том, чтобы при переключении контекста управление перешло к сопрограмме. Как этого достичь? Напомню, что в архитектуре x86_64 стек растет от старших адресов к младшим. Инициализируем переменную task->continuation.rsp адресом вершины стека task->stack + stack_size - 16, предварительно поместив в память, расположенную по этому адресу, начальный адрес функции task_call, которая вызовет сопрограмму. Чуть ниже по стеку (адрес task->stack + stack_size - 8) запишем начальный адрес функции task_yield. Смысл этого действия состоит в том, что после завершения работы сопрограммы и выхода из task_call произошел переход к функции task_yield. Она вернет управление планировщику корутин, как описывалось ранее.

Логика работы task_yield практически не изменилась по сравнению с предыдущим способом.

void task_yield(void)
{
	contextswitch(&scheduler_data.current->task->continuation, &scheduler_data.continuation);
}

Она вызывает функцию contextswitch, которая переключает контекст и написана на ассемблере. Первый параметр - буфер для хранения значений регистров task->continuation передается через регистр rdi. Второй - буфер из которого инициализируются регистры передается через rsi.

.text
.globl contextswitch
.type contextswitch,@function
contextswitch:

        movq     %rsp, 0x00(%rdi)
        movq     %r15, 0x08(%rdi)
        movq     %r14, 0x10(%rdi)
        movq     %r13, 0x18(%rdi)
        movq     %r12, 0x20(%rdi)
        movq     %rbx, 0x28(%rdi)
        movq     %rbp, 0x30(%rdi)


        movq     0x00(%rsi), %rsp
        movq     0x08(%rsi), %r15
        movq     0x10(%rsi), %r14
        movq     0x18(%rsi), %r13
        movq     0x20(%rsi), %r12
        movq     0x28(%rsi), %rbx
        movq     0x30(%rsi), %rbp

        ret

Основная часть работы contextswitch состоит в сохранении значений регистров в буфер, адресуемый первым параметром, и запись значений из буфера, адресуемого вторым параметром, в регистры. В конце выполняется команда ret, которая снимает с вершины стека, адресуемого посредством rsp адрес возврата и переходит по нему.

Логика запуска сопрограмм в планировщике также мало изменилась.

void run_tasks(void)
{
	struct task *next = NULL;

	while(next = choose_task())
	{
		contextswitch(&scheduler_data.continuation, &next->continuation);
		if(scheduler_data.current->task->status == TASK_FINISHED)
			scheduler_free_current_task();
	}

	printf("Finished run_tasks!\n");
}

После извлечения адреса следующей задачи происходит вызов функции contextswitch. При этом на вершину стека помещается адрес следующей команды. Contextswitch сохраняет текущие значения регистров в scheduler_data.continuation после чего загружает в эти регистры значения из next->continuation. При этом регистр rsp указывает на новый стек, подготовленный в init_task_stack. По команде ret берет с вершины адрес возврата - адрес функции task_call и переходит к ее выполнению. В функции task_call происходит обратное действие - возврат управления планировщику.

У читателя может возникнуть следующий вопрос. А есть ли готовые библиотеки, которые позволяют писать код с использованием без стековых сопрограмм, не погружаясь в ассемблер и прочие технические детали? Да, есть. Вот те, которые мне удалось найти.

Это libcoro и libtask, Boost Context и использующие его Boost Coroutine и Coroutine2. Список не претендует на полноту и может быть расширен.

Выводы:

  1. Стековые корутины позволяют сделать код чище по сравнению с тем, что приходилось писать с использованием прото потоков.

  2. За это приходиться платить значительно большим расходом памяти для стека.

  3. При реализации стековых сопрограмм приходится использовать низкоуровневые операции на ассемблере, что делает его зависимым от архитектуры центрального процессора и ABI.

    В следующей статье мы наконец-то перейдем к использованию С++ и рассмотрим как использовать сопрограммы для реализации TCP сервера с неблокирующим вводом-выводом. Продолжение следует...

Комментарии (0)