Сегодня, как и обещала, я продолжу тему планирования легких сущностей, которую уже начала в своем цикле статей. В нем я рассказала о внутреннем устройстве tasklet, workqueue и protothread. Конечно, тема не ограничивается лишь этими примерами: есть еще FreeRTOS с ее coroutine, или GNU Portable threads; или можно отойти от структур и библиотек, применяющихся в ОС, и вспомнить различные green threads, которых становится все больше и больше.

На этот раз я хочу поделиться тем, как мы реализовали легкие потоки в проекте Embox. С одной стороны мы постарались учесть опыт предыдущих разработок, с другой — привнести что-то новое.

С чего все началось


Проблема подступала к нам с разных сторон.

Потоки требуют своего стека, потребляя сравнительно много оперативной памяти. Кооперативная многозадачность у нас осуществлялась только за счет отключения таймера, ответственного за вызов планировщика по истечению кванта времени. К чему это привело? На платах с ограниченными ресурсами порой приходится совсем отказываться от многозадачности.

Кроме того, все чаще и чаще появлялись задачи, для которых было бы неплохо иметь какой-то механизм обработки событий. Желательно с приоритетами и примитивами синхронизации, причем чтобы обрабатывалось все это быстро и не требовало лишней памяти.

Это наталкивало нас на мысли о легковесной, почти кооперативной многозадачности для неких бесстековых сущностей. Но вместе с тем отказываться от полноценной многозадачности никто не собирался.

Поэтому мы решили не только ввести в систему легковесные единицы планирования, но и возложить работу диспетчеризации этих легких потоков на основной планировщик системы на общих правах с полноценными потоками. Потенциально мы видели следующие преимущества:
  • Можно избавиться от прослойки softirq и обрабатывать отложенные прерывания в высоких приоритетах очереди планировщика;
  • В общем-то, при должной настройке приоритетов и некоторые аппаратные прерывания можно обрабатывать таким же образом;
  • Гибкая настойка приоритетов, в том числе во время исполнения. Например, недавно перед нами стояла задача, в которой обязательно нужно было обрабатывать прерывания от таймера раньше, чем от сетевого стека. А некоторые задачи реального времени могут потребовать обработку высокоприоритетных пользовательских потоков вперед каких-то отложенных прерываний.

Итак, о чем мы мечтали:
  • Планировщик, который умеет управлять как обычными потоками, так и легкими;
  • Эффективные легкие потоки, этакие планируемые функции наподобие обработчиков событий с поддержкой синхронизации.


Для уменьшения связности системы, теперь планировщик будет заведовать абстрактными единицами планирования.

Schedee, или абстрактная единица планирования


Идея, на самом деле, довольно проста, особенно если ее рассматривать в терминах ООП. Взгляните на диаграмму:

Schedee — родительский абстрактный класс различных типов потоков. Каждый тип потока должен предоставлять какой-то интерфейс по управлению им. Планировщик ничего не знает о конкретных реализациях, а в своей функции он всего лишь определяет, кто будет исполняться следующим, делает какую-то подготовку и делегирует обработку конкретному schedee, например, вызвав специальную функцию doSomethng().

Далее я буду рассказывать о том, как мы применили эту идею в нашем планировщике.

На языке C наследование, к сожалению, приходится реализовывать руками, а именно на основе структуры как члена другой, более специфичной структуры. У нас будет абстрактная структура schedee, от которой наследуются конкретные типы-структуры thread и lthread и предоставляют реализацию необходимых “абстрактных” функций.

У нас уже были реализованы обычные POSIX-совместимые потоки и центральный планировщик, который ими управляет. Поэтому расширение функциональности планировщика происходило с оглядкой на уже существующую реализацию.

Первое, что нам нужно было сделать — это выделить из структуры обычных потоков часть, которой заведует планировщик. На основе этого мы и построим абстрактную структуру sсhedee.

Выделить общие поля из структуры оказалось довольно просто. Но нужно ли что-то еще? Чтобы ответить на этот вопрос, необходимо, во-первых, понять, как принципиально могут различаться типы потоков, и, во-вторых, разобраться с работой планировщика.

Потоки могу быть:
  • Вытесняемыми или кооперативными. Первые вытесняются потоками с более высоким приоритетом или по истечению кванта времени, а вторые передают управление планировщику сами. С точки зрения самой функции планировщика разницы здесь нет, различие только в том, когда и кем он будет вызван в следующий раз;
  • Со стеком или без стека. Наличие стека у потоков приводит к дополнительному расходу оперативной памяти и затраты на переключении контекста. Отсутствие же стека делает невозможным ожидание со сменой контекста и, как следствие, вытеснение (по крайней мере, полноценное). Эта разница уже принципиальна для планировщика, такие потоки должны обрабатываться по-разному;
  • С одной или несколькими точками входа. С одной точкой входа все просто: функция выполняется от и до. Наличие нескольких точек входа характерно для сопрограмм и итераторов. Для таких потоков необходимо хранить информацию о текущей точки входа, можно считать такую информацию аналогом контекста потока со стеком, ожидающего события. В любом случае, планировщик не должен об этом знать.

Посмотрим теперь, как был устроен планировщик. Код упрощен для лучшего понимания.
void schedule(void) {
    struct thread *prev, *next;
    ipl_t ipl;

    prev = thread_self();

    /* Защита от прерываний */
    ipl = ipl_save();

    if (!prev->waiting)
        runq_enqueue(&rq.queue, prev);

    next = runq_extract(&rq.queue);

    if (prev != next)                 // Это код, специфичный
        sched_switch(prev, next);     // для стековых потоков.

    ipl_restore(ipl);
}

Планировщик устроен довольно просто: он определяет, какой поток будет выполнятся следующим, и если поток сменился, то переключает контекст. После этого уже новый поток естественным образом выходит из функции планировщика и продолжает свою работу. То есть планировщик заточен на работу только со стековыми потоками.

Чтобы абстрагироваться от конкретных реализаций schedee, специфичную обработку нужно вынести в специальную функцию, назовем ее process(). Указатель на эту функцию будет находиться в schedee.

Теперь нужно понять, как обрабатывать потоки без стека. Что мы про них знаем?
Такие потоки не вытесняются. То есть не должно быть никаких неявных вызовов перепланирования;
Для бесстековых потоков не нужно переключать контекст. На самом деле, тут существуют разные подходы, в Linux, например, для последовательной обработки softirq выделяется отдельный стек. Мы пока остановились на варианте обработки таких потоков на стеке последнего исполняемого полноценного потока.

Таким образом, если для обычных потоков необходимо покидать функцию планировщика, то бесстековые потоки целесообразно обрабатывать в цикле прямо в планировщике, что, впрочем, довольно обычный подход. Цикл прекращается, как только на обработку пришел поток со стеком.
void schedule(int preempt) {
    struct schedee *prev, *next, *last;

    prev = schedee_get_current();

    /* Запрещаем прерывания */
    ipl_save();

    if (!prev->waiting)
        runq_enqueue(&rq.queue, prev);

    while (1) {
        next = runq_extract(&rq.queue);

        /* Прерывания разрешаются прямо внутри process() */
        last = next->process(prev, next);

        /* process() возвращает поток, на контекст которого мы переключились.
         * Если переключения не было, то был выполнен бесстековый поток,
         * крутимся дальше. */
        if (last)
            break;

        /* Запрещаем прерывания */
        ipl_save();
    }
}

Инкапсуляция — это хороший принцип, даже если мы имеем дело с процедурным программированием. Так что весь код планировщика должен знать только о schedee, но не о конкретных потоках. Вот полный код структуры schedee:
struct schedee {
    runq_item_t       runq_link; /* Ссылка в очереди runq */
    
    spinlock_t        lock; /* Для блокировки в случае SMP */

    /* Главная функция-обработчик, должна быть задана при инициализации
     * конкретной реализацией schedee. Как мы уже выяснили, в случае обычного
     * потока это переключение контекста, обработка сигналов и т.д.
     * Функция возвращает schedee, на стеке которого нужно продолжить 
     * исполнение. Если schedee кооперативный, то функция возвращает NULL. */
    struct schedee    *(*process)(struct schedee *prev, struct schedee *next);

    /* Эти три поля отвечают за состояния в машине состояний планировщика. */
    unsigned int active;
    unsigned int ready;
    unsigned int waiting;

    /* Специфичные поля */
    struct affinity         affinity;
    struct sched_timing     sched_timing;

    /* Приоритет в очереди runq */
    struct schedee_priority priority;

    /* Ссылка в очереди waitq. О том, почему она здесь, речь пойдет ниже. */
    struct waitq_link waitq_link;
};

И, напоследок, как выглядит функция process() у обычных потоков:
struct schedee *thread_process(struct schedee *prev, struct schedee *next) {
	struct thread *next_t, *prev_t;

	next_t = mcast_out(next, struct thread, schedee);
	prev_t = mcast_out(prev, struct thread, schedee);

	if (prev != next) {
		thread_context_switch(prev_t, next_t);
	}

	ipl_enable();

	if (!prev_t->siglock) {
		thread_signal_handle();
	}

	return &thread_self()->schedee;
}

Легкие потоки


Рассмотрим теперь легкие потоки Embox, ради которых все и затевалось. Наши легкие потоки бесстековые, кооперативные и поддерживают несколько точек входа.

В одной из задач необходимо было обрабатывать постоянно поступающую от внешнего источника информацию, причем с синхронизацией. Поэтому изначально стоял вопрос об адекватной поддержке синхронизации, что по факту упирается в умение:
  1. в первую очередь, например, захватывать мьютекс; в общем же случае — дожидаться любого условия (аналог условных переменных)
  2. в случае неудачи возвращать управление планировщику в середине функции
  3. начинать с нужного места, когда поток разбудят, например, при освобождении необходимого мьютекса.

В случае с обычными потоками все просто: при засыпании запрашивается вызов планировщика и происходит перепланирование. Поток при этом просто вызывает, например, функцию sleep(). Пункты 1 и 3 осуществляются за счет сохранности стека и контекста. С бесстековыми потоками все сложнее:
  • Из функции потока нужно выйти явно, чтобы стек оказался в том же состоянии, что и до запуска легкого потока.
  • Функция легкого потока начнется с самого начала. При повторном вызове локальные переменные перестают быть актуальными.


Для борьбы с последним пунктом мы решили пойти по пути сопрограмм, как Adam Dunkels в своей библиотеке protothread. Но, в отличие от Адама, мы решили не прятать логику за хитрыми макросами, а сделать интерфейс как можно более явным. Наше решение основано на расширении GCC Labels as Values. Это расширение позволяет получать ссылки на метки, а также рассчитывать отступ от какой-то конкретной метки, пользуясь привычными манипуляциями с указателями.
Легкие потоки и их структура

Чтобы разобраться во всем поподробнее, рассмотрим детали реализации. Во-первых, структура самих lthread:
struct lthread {
	struct schedee schedee;
	int (*run)(struct lthread *);
	int label_offset;
	struct schedee *joining;
	struct sched_wait_info info;
};

Первое, что стоит пояснить, это сигнатуру функции run. Так как у легких потоков нет стека, предполагается, что структура легких потоков нередко будет членом другой структуры, содержащей информацию, которую необходимо будет сохранять от вызова к вызову. Поэтому разумно использовать ссылку на сам легкий поток как главный аргумент, а к остальной информации обращаться с помощью приведения типов.

Возвращаемым значением функции run служит информация о входной точке. Если входная точка у функции предполагается одна или при перезапуске следует использовать начальную входную точку, то возвращаемым значением будет 0. Иначе возвращаемое значение задается макросом lthread_yield(). Этот и другие макросы я опишу ниже.

Поле label_offset — это отступ от начальной метки функции. Это служебное поле для сохранения входной точки потока. Работа с различными входными точками осуществляется с помощью макросов lthread_resume() и lthread_yield().

Поле joining — это ссылка на schedee потока, ожидающего завершения легкого потока, по аналогии с обычными потоками. Про это подробнее будет ниже.

И, наконец, info — это служебное поле с необходимой информацией для ожидания по таймауту.

API легких потоков

Базовое API легких потоков довольно простое. Функции инициализации и запуска вряд ли требуют пояснений:
void lthread_init(struct lthread *lt, int (*run)(struct lthread *));
void lthread_launch(struct lthread *lt);

Когда приложение отработало, необходимо убедиться, что легкие потоки завершились и их никто больше не разбудит. Если это не так, то небезопасно завершать приложение: пробуждение легкого потока может произойти тогда, когда его структура окажется неактуальной. Поэтому нужно сначала дождаться, чтобы поток был в нужном состоянии. Это напоминает поведение функции thread_join(). Нам же нужен его аналог для легких потоков:
extern void lthread_join(struct lthread *lt);

Сейчас у этой функции есть один недостаток: вызывать ее можно только из потока со стеком. На самом деле ее можно расширить до общего случая, если понадобится.

Есть еще одно важное замечание, касающееся этой функции. Для того, чтобы его понять, немного отвлечемся и обратим внимание на жизненный цикл легкого потока. Легкий поток может находиться только в двух состояниях: готовность и ожидание. Готовые легкие потоки лежат в очереди планировщика runq, и пока он находится в этом состоянии, добавить его в очередь во второй раз нельзя. Однако, как только готовый поток достается из очереди, он переходит в состояние ожидания, и теперь его можно вновь запланировать, в том числе это может сделать он сам. Точно так же себя ведут, например, tasklet’ы в Linux. Если легкий поток планирует себя сам (фактически, никогда не завершается), то поток, вызвавший lthread_join(), просто зависнет. Пользователь сам несет ответственность за то, чтобы легкий поток себя не планировал в этом случае.

Рассмотри часть API, предоставляющую управления метками, то есть множественными точками входа. Важное замечание: работа с метками осуществляется только в самой функции run(), но не в тех функциях, которые она вызывает. К сожалению, от этих ограничений никуда не деться при отсутсвии стека.

Макрос, который возвращает сохраненную ранее метку:
/* Шаблон использования: goto lthread_resume(lt, start); */
#define lthread_resume(lt, initial_label)             *((lt)->label_offset + (initial_label))

Макрос, который возвращает отступ относительно начальной метки:
/* Шаблон использования: return lthread_yield(lt, start, resume_point); */
#define lthread_yield(initial_label, target_label)             (target_label) - (initial_label);

Макрос lthread_resume() всегда должен идти в паре с оператором goto, а lthread_yield() — с оператором return.

Все функции ожидания и синхронизации для легких потоков действуют по одному принципу. Рассмотрим на примере макроса простого ожидания по таймауту:
SCHED_WAIT_TIMEOUT_LTHREAD(self, cond_expr, timeout);

Эта функция возвращает один из трех возможных кодов:
  • 0, если условие стало истинным;
  • ETIMEDOUT, если по истечению таймаута условие так и не выполнено;
  • EAGAIN, если условие еще не истинно, таймер заведен, необходимо выйти из функции легкого потока, чтобы вернуть управление планировщику.


Теперь рассмотрим функцию обработки легких потоков, которая вызывается из планировщика:

/* Блокировки: прерывания и планировщик */
static struct schedee *lthread_process(struct schedee *prev, struct schedee *next) {
    struct lthread *lt = mcast_out(next, struct lthread, schedee);
    schedee_set_current(next);

    /* Как уже говорилось, у легкого потока всего два состояния и они
     * взаимоисключающие. */
    next->ready = false;
    next->waiting = true;

    /* В функии process() необходимо разрешить прерывания. */
    ipl_enable();

    /* Вот здесь исполняется функция и сохраняется метка. Помним про макрос
     * lthread_yield(). */
    lt->label_offset =lt->run(lt);	

    /* Если поток в конечном или начальном состоянии, будим schedee, ожидающего
     * его завершения. */
    if (lt->joining && __lthread_is_disabled(lt))
		sched_wakeup(lt->joining);

    return NULL;
}

Пример


Для демонстрации рассмотрим игру Race, которая работает в два легких потока: один отвечает за перемещение дороги с препятствиями, другой проверяет, нажата ли управляющая клавиша.

Весь код игры лежит вот тут, здесь я приведу только основную функцию легкого потока, отвечающую за перемещение дороги.
static int move_road(struct lthread *self) {
    int to_wait;

    /* Функция lthread_resume() вернет указатель на сохраненную метку.
     * Если поток запущен впервые, то переход будет выполнен к метке,
     * указанной вторым аргументом - это начальная метка. */
    goto lthread_resume(self, &&update);

update:
    is_game_over |= is_obstacle(car_line_nr*RACE_ROAD_LEN + 1);
    if (is_game_over)
        return 0;

    race_update_road();
    race_print_road(road);

wait:
    to_wait = RACE_ROAD_UPD_MS - (step / RACE_LVL_STEP) * RACE_LVLUP_MS;

    if (SCHED_WAIT_TIMEOUT_LTHREAD(self, 0, to_wait) == -EAGAIN) {
        /* При возвращении из функции необходимо либо указать 0, что будет
         * соответствовать начальной метке, либо с помощью функции
         * lthread_yield() указать метку, к которой нужно будет перейти в
         * следующий раз. */
        return lthread_yield(&&update, &&wait);
    }

    goto update;
}

Это приложение было написано для демонстрации работы планировщика и легких потоков на плате STM32VLDISCOVERY с LCD экраном wh1602b. На самом деле, характеристики STM32VL позволяют запустить Embox с этой игрой и с вытесняемыми потоками: 8кб RAM и 128кб flash. Поэтому я приведу характеристику образов Embox для платы с игрой на легких потоках и на обычных потоках:
кооперативный совмещенный
text Os/O2 28724/31504 30152/33116
data 436 436
bss 3168 3102
Основной стек + стек потоков 1536 + 0 1078 + 1782*2

Напоследок видео с процессом игры.


Заключение


Весь код, который я привела в статье, можно посмотреть в github репозитории нашего проекта:

Работать нам еще есть над чем. Легкие потоки еще молодые и, скорее всего, они еще не раз будут меняться и улучшаться. Уже сейчас есть идеи и задачи:
  • Возможно, есть смысл разделить легкие потоки на два разных типа: с использованием сопрограмм и простейшие атомарные функции наподобие tasklet. К тому же, возможно, можно добиться функциональности сопрограмм и без меток, просто разбив функцию потока на несколько и и при каждом вызове заменяя указатель на функцию. Получилась бы этакая машина состояний.
  • Потенциально можно реализовать вытеснение легких потоков более приоритетными легкими потоками. Это важно для реального времени, но сложно с той точки зрения, что легкие потоки больше не будут атомарными, а, значит, могут потребоваться различные блокировки.
  • Из механизмов синхронизации пока для легких потоков реализованы только мьютексы, так как их мы чаще всего используем. Другие тоже ждут своего часа.

Разумеется, есть целый ряд подсистем, для которых хочется внедрить легкие потоки. Например, драйвер tty, подсистема pnet.

Со временем все эти проблемы будут решаться. Это можете сделать и вы :)

P.S. В Санкт-Петербурге каждую последнюю среду месяца проходят линуксовки. Я там еще раз расскажу про легкие потоки, про то, как устроены workqueue и taklet ядра Linux и, может, что-то еще :)
Там можно будет послушать и другие интересные выступления. Информацию смотрите в рассылке SPbLUG. Встречи походят в 19:00 по адресу: 10-ая линия В.О. 33-35, Географический факультет СПбГУ.

Комментарии (2)


  1. Indemsys
    24.04.2015 21:09

    Нельзя ли подробнее о «легких потоках».
    Какую же конкретно задачу вы успешно решили с их использованием.
    Какой выигрыш в наносекундах получили, на какой платформе.
    И имеете ли вообще тайминги работы хотя бы основных сервисов вашей RTOS на какой либо платформе?


    1. LifeV Автор
      25.04.2015 14:16

      1. Пока что легкие потоки используются у нас как замена прослойки softirq. Это не главное, для чего они создавались, но в скором времени мы применим их по назначению. Конкретная задача, которая привела к появлению потоков — это драйвер tty, который использовал потоки и синхронизацию мьютексами, а нужно было это на маленькой плате запускать. Тогда из-за отсутствия легких потоков мы пошли обходным путем. Сейчас приоритеты у нас немного другие, но потом мы переведем tty на легкие потоки.
      2. О выигрыше во времени говорить тут не очень уместно. Легкие потоки работают быстрее обычных потоков, да, и в ближайшие пару недель я произведу соответствующие замеры. Но вот, например, если реализовать кооперативную многозадачность только через switch, это будет, очевидно, быстрее. Тут скорее речь идет об удобстве программирования с наименьшими затратами на память и такты. Да еще и круто, что любой легкий поток может синхронизоваться с обычными, передавать сообщения друг другу. То есть они не крутятся каждый в своей песочнице.
      3. Какие-то замеры у нас есть, хоть и не в систематизированном виде. Может, вас что-то конкретное интересует? Я попрошу коллег конкретные цифры предоставить.