Планировщик — мозг операционной системы. Его задача: решать, какая задача выполняется сейчас, и по каким правилам выдавать процессор другим задачам. Для embedded систем это особенно критично: ресурсы ограничены, реальное время важно, а поведение должно быть предсказуемым.

Это вторая из цикла статей про создание микроядерной операционной системы. В прошлой статье рассматривался таймер и HAL. Для вновь пришедших необходимо сначала ознакомиться с ней: https://habr.com/ru/articles/935058/


Разбор полётов

При реализации переключения задач, я обмолвился, что сделал всё довольно грязно, затёр несколько регистров. Как оказалось в дальнейшем, на AVR это в ряде случаев приводило к неопределённому поведению и порождало трудноуловимые баги. У меня было достаточно времени, чтобы всё исправить, и я полностью пересмотрел логику сохранения и восстановления контекста, вынеся их в отдельный ассемблерный код .S. Так и проще читается, и не нужно продумывать клобберы. Напомню основные моменты.

.section .text

.equ __SP_L__, 0x3D
.equ __SP_H__, 0x3E
.equ __SREG__, 0x3F

; Вместо хранения данных о состоянии в самой задаче,
; они теперь вынесены в переменные внутри реализации HAL
.extern rei_avr_saved_pc
.extern rei_avr_saved_sreg

.global __rei_hal_save_context
.type   __rei_hal_save_context, @function
/*
 * void __rei_hal_save_context(uint16_t sp, uint16_t pc, uint8_t sreg);
 * ABI (avr-gcc):
 *   arg1 (sp)   -> r25:r24 (r24 - low, r25 - high)
 *   arg2 (pc)   -> r23:r22 (r22 - low, r23 - high)
 *   arg5 (sreg) -> r20
 * We will use r17:r16 as temporaries to store old SP.
 */
__rei_hal_save_context:
	; Сохраняем текущий стек перед изменением SP, дабы затем в нему вернуться.
	; Поскольку мы используем для этого пару регистров r17:r16, сначала пушим их.
	push r16
	push r17
	in   r16, __SP_L__
	in   r17, __SP_H__

	; Кладём в SP вершину стека сохраняемой задачи
	out  __SP_L__, r24
	out  __SP_H__, r25

	; Пушим адрес текущей инструкции и регистр состояния SREG
	push r22  ; pc_high
	push r23  ; pc_low
	push r20  ; sreg

	; Пушим все остальные регистры
	push r0
	push r1
	clr  r1  ; очищаем r1: по соглашению AVR, он должен равняться 0
	push r2
	push r3
	push r4
	push r5
	push r6
	push r7
	push r8
	push r9
	push r10
	push r11
	push r12
	push r13
	push r14
	push r15
	push r16
	push r17
	push r18
	push r19
	push r20
	push r21
	push r22
	push r23
	push r24
	push r25
	push r26
	push r27
	push r28
	push r29
	push r30
	push r31

	; Возвращаем старый стек и достаём из него сохранённые r17:r16
	out  __SP_L__, r16
	out  __SP_H__, r17
	pop  r17
	pop  r16

	ret

	.size __rei_hal_save_context, .-__rei_hal_save_context

.global __rei_hal_restore_context
.type   __rei_hal_restore_context, @function
/*
 * void __rei_hal_restore_context(uint16_t sp);
 *   arg1 (sp) -> r25:r24
 */
__rei_hal_restore_context:
	; Меняем стек на переключаемую задачу. Сохранять старый не нужно - возврата не будет.
	out  __SP_L__, r24
	out  __SP_H__, r25

	; Восстанавливаем контекст (регистры) из стека задачи
	pop  r31
	pop  r30
	pop  r29
	pop  r28
	pop  r27
	pop  r26
	pop  r25
	pop  r24
	pop  r23
	pop  r22
	pop  r21
	pop  r20
	pop  r19
	pop  r18
	pop  r17
	pop  r16
	pop  r15
	pop  r14
	pop  r13
	pop  r12
	pop  r11
	pop  r10
	pop  r9
	pop  r8
	pop  r7
	pop  r6
	pop  r5
	pop  r4
	pop  r3
	pop  r2
	pop  r1
	pop  r0

	; После выталкивания общих регистров, выталкиваем и регистр состояния SREG.
	; Выталкивать PC не нужно и невозможно вручную - он теперь и так на вершине
	; стека и будет использован для возврата.
	pop  r27
	out  __SREG__, r27

	; Возвращаемся из функции. Поскольку у нас сейчас на вершине стека адрес
	; прерванной операции внутри задачи, возврат произойдёт прямо в неё, как
	; будто она сама вызвала __rei_hal_restore_context(sp).
	ret

	.size __rei_hal_restore_context, .-__rei_hal_restore_context

Изменению также подвергся таймер. В прошлой версии при прерывании все "ядерные" операции использовали стек текущей задачи, а это нарушало латентность. Я выделил отдельную область память под стек ядра и при срабатывании таймера теперь сначала переключаюсь на него, сохраняю все регистры, и только затем вызываю основной обработчик. Обёртка была полностью написана на ассемблере, без использования сишных вставок.

Byte rei_avr_core_stack[REI_CORE_STACK_SIZE];
Address rei_avr_core_stack_base = (Address) rei_avr_core_stack + REI_CORE_STACK_SIZE - 1;

ISR(TIMER1_COMPA_vect) {
	if (rei_core.scheduler.current_task) {
		// Встроенный макрос: сохраняет адрес возврата из прерывания
		rei_avr_saved_pc = (Address) __builtin_return_address(0);
		rei_avr_saved_sreg = SREG |= 0x80;
	}

	rei_hal_tick();
}
.section .text

.equ __SP_L__, 0x3D
.equ __SP_H__, 0x3E
.equ __SREG__, 0x3F

.extern rei_avr_core_stack_base
.extern rei_core_tick

.global rei_hal_tick
.type   rei_hal_tick, @function
/*
 * void rei_hal_tick();
 */
rei_hal_tick:
	; Сохраняем вершину стека текущей задачи, чтобы затем к ней вернуться
	in r18, __SP_L__
	in r19, __SP_H__

	; Загружаем в пару регистров r25:r24 значение rei_avr_core_stack_base,
	; то есть, низа стека ядра. Не стоит беспокоиться о его вершине,
	; каждая ISR (Interrupt Service Routine) будет просто использовать его с нуля.
	; Главное - обеспечить атомарность.
	lds r24, rei_avr_core_stack_base
	lds r25, rei_avr_core_stack_base+1
	out __SP_L__, r24
	out __SP_H__, r25

	; Сохраняем регистры
	push r0
	push r1
	clr  r1
	push r2
	push r3
	push r4
	push r5
	push r6
	push r7
	push r8
	push r9
	push r10
	push r11
	push r12
	push r13
	push r14
	push r15
	push r16
	push r17
	push r18
	push r19
	push r20
	push r21
	push r22
	push r23
	push r24
	push r25
	push r26
	push r27
	push r28
	push r29
	push r30
	push r31

	; Вызываем основной обработчик
	call rei_core_tick

	; Восстанавливаем регистры
	pop r31
	pop r30
	pop r29
	pop r28
	pop r27
	pop r26
	pop r25
	pop r24
	pop r23
	pop r22
	pop r21
	pop r20
	pop r19
	pop r18
	pop r17
	pop r16
	pop r15
	pop r14
	pop r13
	pop r12
	pop r11
	pop r10
	pop r9
	pop r8
	pop r7
	pop r6
	pop r5
	pop r4
	pop r3
	pop r2
	pop r1
	pop r0

	; Восстанавливаем вершину стека
	out __SP_L__, r18
	out __SP_H__, r19

	ret

	.size rei_hal_tick, .-rei_hal_tick

Предварительные ласки

Вообще-то, если следовать строгим требованиям к микроядерной архитектуре, планировщик должен находиться в user-space, то есть, быть изолирован от ядра в качестве модуля. Э. Танненбаум писал, что системы, не придерживающиеся данного правила - не по настоящему микроядра, и критиковал даже собственные разработки. Вместо прямого вызова функции планирования, таймер должен класть в IPC сообщение для планировщика. Штож, вот несколько причин, почему это правило будет проигнорировано:

  • Мы стремимся обеспечить минимальную задержку переключения, а шина IPC будет здорово тормозить;

  • Несмотря на кажущееся сокращение объёма самого ядра, объём планировщика сильно возрастает для обработки сообщений от таймера;

  • Полностью вынести весь планировщик в user-space всё равно не выйдет, поскольку нам нужен прямой доступ к пространству ядра при переключении.

  • Зачем в таком деле, как ОСдев, следовать строгим правилам? Мы ведь не копию линукса пишем, а полностью свой проект;

  • Какое ещё такое IPC? Я обещал в этой статье уже показать работу ядра в действии, а реализация IPC и системных вызовов займёт ещё как минимум одну.

Таким образом, слишком много аргументов против ТРУЪ микроядерного подхода лишь ради того, чтобы называться микроядром. Планировщик будет реализован полностью в ядре, и точка.

Архитектура

В основе лежат простые структуры:

  • ready_head - односвязный список задач, готовых к выполнению (отсортирован по приоритету, максимум в голове);

  • current_task - указатель на выполняющуюся задачу;

  • ticks_left - квант времени для задачи (зависит от приоритета).

Типичный цикл планировщика:

  1. Если нет current_task - run_first() выбирает и запускает первую задачу;

  2. Если current_task есть - уменьшаем ticks_left

  3. Если ticks_left исчерпан, то есть, сравнялся с 0 - ставим задачу в ready и переключаемся на следующую.

Код и комментарии

Ниже - ключевые фрагменты кода и детальные комментарии к ним.

Обновлённая структура задачи

typedef int64_t TaskID;
typedef int8_t TaskPriority;
typedef uint16_t TaskFlags;
typedef int16_t TaskExitStatus;

typedef enum {
	REI_TASK_READY = 0,
	REI_TASK_RUNNING,
	REI_TASK_SLEEPING,
	REI_TASK_BLOCKED,
	REI_TASK_TERMITANED,
} TaskState;

typedef void (*TaskEntry)();

typedef struct task_t {
	struct task_t * next;
	struct task_t * parent;
	TaskID id;
	TaskState state;
	TaskPriority priority;
	TaskFlags flags;
	Tick ticks_left;
	Tick sleep_ticks_left;
	Stack stack;
	TaskEntry entry;
	TaskExitStatus exit_status;
	uint64_t n_children;
	struct task_t ** children;
} Task;

Инициализация планировщика

bool rei_scheduler_init() {
	rei_core.n_tasks = rei_core.next_task_id = 0;
	rei_core.scheduler.current_task =
    	rei_core.scheduler.ready_head =
    	rei_core.scheduler.sleeping_head = NULL;
	return true;
}

Хорошая и простая инициализация. Не лишним будет убедиться, что rei_core.next_task_id никогда не превышает размеров массива задач, добавим проверку при rei_task_create().

Добавление в ready-list (вставка по приоритету)

void rei_scheduler_add(Task * task) {
	if (!task) {
		return;
	}

	task->ticks_left = 0;
	task->state = REI_TASK_READY;

	if (!rei_core.scheduler.ready_head) {
		rei_core.scheduler.ready_head = task;
		return;
	}

	if (task->priority > rei_core.scheduler.ready_head->priority) {
		task->next = rei_core.scheduler.ready_head;
		rei_core.scheduler.ready_head = task;
		return;
	}

	Task * it = rei_core.scheduler.ready_head;
	while (it->next && it->next->priority >= task->priority) {
		it = it->next;
	}
	task->next = it->next;
	it->next = task;
}

Вставка упорядочена по убыванию приоритета. То самое поле next, встроенное прямо в Task, понадобилось здесь для реализации связного списка FIFO. При отсутствии задач, новая становится головой. Если голова есть, но приоритет её ниже новой задачи, задача заменяет её место. Иначе, при равных приоритетах новая задача добавляется после существующих - это обеспечивает fair round-robin для задач с одинаковым приоритетом. Перед вставкой обнуляется task->next, чтобы избежать неожиданного поведения, если задача уже была в другом списке (например, sleeping).

Перевод приоритета в тики (выделенный квант времени)

static inline Tick priority_to_ticks(TaskPriority priority) {
	return (Tick) ((priority + 128) / 8 + 1);
}

Привязка приоритета к количеству тиков - простая линейная формула. Если будете писать свою ОС, подумайте о том, какой диапазон TaskPriority используется, и какова частота тиков у таймера - от этого зависит реальная длительность кванта.

Запуск первой задачи (no current)

static inline noreturn void run_first() {
	Task * first_task = rei_scheduler_next();
	if (!first_task) {
		// panic
	}

	first_task->ticks_left = priority_to_ticks(first_task->priority);
	first_task->state = REI_TASK_RUNNING;
	rei_core.scheduler.current_task = first_task;

	rei_core.hal.switch_task(NULL, first_task);
	// should never reach here
}

switch_task(NULL, first_task) - HAL должен корректно обрабатывать old_task == NULL. Это переход к контексту первой задачи, и возврата из функция run_first() уже не будет.

Основной метод планирования

void rei_scheduler_schedule() {
	if (!rei_core.scheduler.current_task) {
		run_first();
	}

	if (rei_core.scheduler.current_task->state == REI_TASK_RUNNING) {
		if (--rei_core.scheduler.current_task->ticks_left > 0) {
			return;
		}

		rei_scheduler_add(rei_core.scheduler.current_task);
	}

	run_next();
	// unreachable if next task found
}

Здесь цикл понятен: есть текущая задача - уменьшаем ticks_left; если исчерпан - кладём задачу в ready и переключаемся. Обратите внимание: --ticks_left должен выполняться под защитой от прерываний (см. раздел про гонки).

Yield

void rei_scheduler_yield() {
	if (rei_core.scheduler.current_task) {
		rei_core.scheduler.current_task->ticks_left = 1;
		rei_core.hal.tick();
	}
}
void rei_hal_capture_state() {
	cli();
	rei_avr_saved_pc = (Address) __builtin_return_address(0);
	rei_avr_saved_pc += 2;  // пропускаем сам yield
	rei_avr_saved_sreg = SREG |= 0x80;
	sei();
}

yield() заставляет текущую задачу уступить CPU - здесь это достигается установкой ticks_left = 1 и вызовом hal.tick() (который, по контракту, инициирует вызов планировщика). Далее, согласно логике планирования, ticks_left обнуляется, заставляя ядро переключиться на следующую задачу. rei_hal_capture_state() должен быть вызван перед rei_scheduler_yield(), чтобы сохранить pc и sreg для контекста.

Зачем это нужно? Задачи условно можно разделить на два класса: алгоритмические и реагирующие. Алгоритмические выполняют долгие вычисления, и им действительно нужно выделять как можно больше процессорного времени. Пример - конвертация видео в другой кодек, либо раскодирование этого самого видео. Реагирующие же работают рывками: после получения требуемых данных они некоторое время не получают новых и зря занимали бы процессорное время, которого им требуется совсем немного. Пример - курсор мыши. С другой стороны, для плавной работы, такие задачи должны вызываться чаще алгоритмических. Мы пока отложим это в сторону и просто реализуем добровольную передачу процессора через yield.

Забавный факт: некоторые планировщики вообще не переключают задачи автоматически по таймеру, полностью полагаясь на yield. То есть, в таких системах все задачи должны рано или поздно вызывать yield для переключения. Забавным здесь является то, что к таким системам относилась старые Mac OS до 10 версии.

Инварианты, которые нужно поддерживать

Для корректности планировщика критично поддерживать несколько инвариантов:

  • Уникальность размещения: задача должна находиться ровно в одном месте - ready или current;

  • Атомарность операций со списками: все вставки/удаления должны выполняться с выключенными прерываниями (или под lock);

  • Корректное состояние current_task: если не NULL, то state == REI_TASK_RUNNING;

  • switch_task есть no-return: проектируйте код после вызова switch_task с этим учётом.

Нарушение инвариантов — частая причина сложных багов.

Гонки и критические секции

Планировщик взаимодействует с ISR (таймер), поэтому гонки — обычная вещь. Простая рекомендация для микроконтроллеров: для операций со списками используем cli()/sei() (или сохранение/восстановление SREG на AVR).

Пример защищённой секции на AVR:

Производительность: сложность и оптимизации

Текущий код имеет следующие сложности:

  • rei_scheduler_add() — O(n) (обход ready-list).

  • rei_scheduler_next() — O(1).

Оптимизации:

  • Bitmap/priority-queue для ready-list — если диапазон приоритетов ограничен, можно сделать O(1) выбор максимального приоритета (builtin_clz).

  • Array of FIFO queues (по приоритетам) — вставка O(1), выбор O(1) (сканирование от max-priority вниз; с bitmap ещё быстрее).

Алгоритмические улучшения и паттерны

Aging (во избежание starvation)

Если низкоприоритетные задачи долго не получают CPU, можно постепенно повышать их "эффективный" приоритет.

Priority Inheritance (проблема priority inversion)

Если есть mutex, и low-prio держит ресурс, ожидаемый high-prio, low-prio нужно временно повышать (inherit) до уровня ожидающего.

Разные политики планирования (для загугливания)

  • Round-robin per-priority - уже частично реализуется.

  • Multilevel feedback queues - адаптивная система для смешанных нагрузок.

  • EDF / RMS - для реального времени с дедлайнами.

Таймер, гранулярность и tickless режим

  • Tick duration влияет на отзывчивость: маленький tick — лучший отклик, но больший overhead. Большой tick — плохой отклик, низкий overhead.

  • Tickless idle — экономит энергию: вместо периодического тика ставите таймер на ближайшее событие и идёте в режим сна. Сложнее в реализации (нужно корректно обрабатывать остальное время).

ЧТО?

Не пугайтесь, это была матчасть для более глубокого погружения, мы не станем её сейчас реализовывать. Но важно помнить, что наш планировщик хоть и простой для понимания, всё же в дальнейшем потребует доработки теми напильниками, что я описал.

Использование

Как я и обещал, на данном этапе операционную систему уже можно использовать в качестве RTOS. Пока что у нас нет изолированного пользовательского пространства, так что придётся создавать задачи и настраивать планировщик вручную. У нас будут 4 задачи: idle_entry - простой, дабы при отсутствиии других задач планировщик не сходил с ума. Просто передаёт пранирование дальше. И три мигающих светодиода на 2, 4 и 7 пинах.

#include "../inc/core.h"
#include "../inc/task.h"
#include "../inc/scheduler.h"

#include <avr/io.h>
#include <avr/interrupt.h>

extern ReiCore rei_core;
extern HAL hal_avr;

static inline void toggle_pin(volatile uint8_t * port, uint8_t bit) {
	*port ^= (1 << bit);
}

void idle_entry() {
	// Тест стека: кроме контекста, там могут находиться ещё данные самой задачи.
	// В данном случае, просто помещаем байт со значением 90 на его дно.
	*((Byte*) rei_core.scheduler.current_task->stack.top--) = 90;

	while (true) {
		rei_core.hal.capture_state();
		rei_scheduler_yield();
	}
	rei_core.scheduler.current_task->exit_status = 0;
}

void blink_pin2_entry() {
	DDRD |= (1 << PD2);
	PORTD &= ~(1 << PD2);

	while (true) {
		toggle_pin(&PORTD, PD2);
		// Делаем паузу, в течение которой таймер также может прерывать задачу
		for (volatile uint32_t i = 0; i < 70000UL; ++i) { asm volatile ("nop"); }
	}
	rei_core.scheduler.current_task->exit_status = 0;
}

void blink_pin4_entry() {
	DDRD |= (1 << PD4);
	PORTD &= ~(1 << PD4);

	while (true) {
		toggle_pin(&PORTD, PD4);
		for (volatile uint32_t i = 0; i < 40000UL; ++i) { asm volatile ("nop"); }
	}
	rei_core.scheduler.current_task->exit_status = 0;
}

void blink_pin7_entry() {
	DDRD |= (1 << PD7);
	PORTD &= ~(1 << PD7);

	while (true) {
		toggle_pin(&PORTD, PD7);
		for (volatile uint32_t i = 0; i < 100000UL; ++i) { asm volatile ("nop"); }
		rei_scheduler_sleep(rei_core.scheduler.current_task, 40);
	}
	rei_core.scheduler.current_task->exit_status = 0;
}

#define IDLE_STACK_SIZE  64
#define BLINK_STACK_SIZE 64

Byte idle_stack_memory[IDLE_STACK_SIZE];
Byte blink_pin2_stack_memory[BLINK_STACK_SIZE];
Byte blink_pin4_stack_memory[BLINK_STACK_SIZE];
Byte blink_pin7_stack_memory[BLINK_STACK_SIZE];

int main() {
	// Я задал умопомрачительное значение интервала между тиками для демонстрации
	if (!rei_core_init(hal_avr, 0, 100000)) {
		return 1;
	}

	Stack idle_stack  = {
		.size = IDLE_STACK_SIZE,
		.base = (Address) idle_stack_memory + IDLE_STACK_SIZE - 1,
	};
	Stack blink_pin2_stack = {
		.size = BLINK_STACK_SIZE,
		.base = (Address) blink_pin2_stack_memory + BLINK_STACK_SIZE - 1,
	};
	Stack blink_pin4_stack = {
		.size = BLINK_STACK_SIZE,
		.base = (Address) blink_pin4_stack_memory + BLINK_STACK_SIZE - 1,
	};
	Stack blink_pin7_stack = {
		.size = BLINK_STACK_SIZE,
		.base = (Address) blink_pin7_stack_memory + BLINK_STACK_SIZE - 1,
	};

	rei_task_create(NULL, &idle_stack,       idle_entry);
	rei_task_create(NULL, &blink_pin2_stack, blink_pin2_entry);
	rei_task_create(NULL, &blink_pin4_stack, blink_pin4_entry);
	rei_task_create(NULL, &blink_pin7_stack, blink_pin7_entry);

	rei_core_main();

	for (;;);  // should never reach here
}

Вот что получилось. Задачки переключаются, светодиоды моргают по очереди и остаются в том состоянии, на котором их застал таймер.

Заключение и дальнейшие шаги

Планировщик, который мы разобрали, - отличный старт для RTOS на микроконтроллере. Она проста, понятна и производительна для небольшого числа задач. Основные направления для улучшения:

  • добавить защиту операций со списками (atomics/cli/sei),

  • рассмотреть bitmap/priority-queues при ограниченном количестве приоритетов,

  • добавить механизмы для priority inheritance и aging.

Но это всё позже. Сейчас анонс следующей статьи. Сначала я планировал постепенно переходить от AVR к RISC-V, Cortex-M и более взрослым архитектурам, но у нас ещё как минимум нет той самой IPC, системных вызовов и управления памятью. Помните, я писал, что при разработке операционной системы мы сталкиваемся с тем, что не можем больше использовать привычное динамическое выделение памяти? Конечно, на AVR есть своя clib с malloc, но и её ещё предстоит обернуть в HAL. До x86 же нам ещё долго. Посудите сами: если вам не знакомы такие аббревиатуры, как GDT, IDT, PIT, MMU, TLB, то перейти к современным компьютерам получится не раньше, чем мы рассмотрим RISC-V и ARM. Так что, в следующей статье разберём IPC и системные вызовы, и возможно, попробуем потихоньку начать портировать нашу операционную систему на RISC-V.

Комментарии (0)