Постановка задачи

В программировании микроконтроллеров часто нужно написать простые тестировочные прошивки. При этом надо некоторые функции вызывать чаще, а некоторые реже. Вот например как тут

Для этого конечно можно запустить FreeRTOS, однако тогда код не будет переносим на другие RTOS, например Zephyr RTOS/TI-RTOS/RTEMS/Keil RTX/Azure RTOS или SafeRTOS. Потом прошивку как код часто приходится частично отлаживать на PC а там никакой RTOS в помине нет.

Поэтому надо держать наготове какой-нибудь простенький универсальный переносимый кооперативный NoRTOS планировщик с минимальной диагностикой и возможностью в run-time отключать какие-то отдельные задачи для отладки оставшихся.

Проще говоря нужен диспетчер задач для микроконтроллера.

Определимся с терминологией

Кооперативный планировщик - это такой способ управления задачами, при котором задачи сами вручную передают управления другим задачам.

Супер-цикл - тело оператора бесконечного цикла в NoRTOS прошивках. Обычно бесконечный цикл в прошивках можно найти по таким операторам как for(;;){} или while(1){}

Bare-Bone сборка - это сборка прошивки на основе API какой-нибудь RTOS, где только один поток и этот поток прокручивает супер-цикл с кооперативным планировщиком. Эта сборка нужна главным образом только для отладки RTOS: настройки стека, очередей и прочего.

Ядром любого планировщика является генератор или источник стабильного тактирования. Желательно с высокой разрешающей способностью. Микросекундный таймер. Это может быть SysTick таймер с пересчетом в микросекунды или отдельный аппаратный таймер общего назначения. Обычно аппаратных таймеров от 3х до 14ти в зависимости от модели конкретного микроконтроллера. Также важно, чтобы таймер возрастал. Так проще и интуитивно понятнее писать код нам человекам, так как мы привыкли к тому что время оно всегда непрерывно идет вперед, а не назад.

Для определенности будем считать, что все задачи имеют одинаковых прототип

 bool task_proc(void);

Сначала надо определить типы данных.

#ifndef TASK_GENERAL_TYPES_H
#define TASK_GENERAL_TYPES_H

/*Mainly for NoRtos builds but also for so-called RTOS Bare-Bone build*/
#include <stdbool.h>

#include "task_const.h"
#include "limiter.h"

typedef struct  {
    uint64_t period_us;
    bool init;
#ifdef HAS_LIMITER
    Limiter_t limiter;
#endif
    const char* const name;
} TaskConfig_t;

#endif /* TASK_GENERAL_TYPES_H */

Ключевым компонентом планировщика, его ядром является программный компонент называемый Limiter. Это такой программный компонент, который не позволит вызывать функцию function чаще чем установлено в конфиге. Например вызывать функцию не чаще чем раз в секунду или не чаще чем раз в 10 ms.

#ifndef LIMITER_TYPES_H
#define LIMITER_TYPES_H

#include <stdbool.h>
#include <stdint.h>

#include "data_types.h"

typedef bool (*TaskFunc_t)(void);

typedef struct {
    bool init;
    bool on_off;
    uint32_t call_cnt;
    uint64_t start_time_next_us;
    U64Value_t duration_us;
    U64Value_t start_period_us;
    uint64_t run_time_total_us;
    uint64_t start_time_prev_us;
    TaskFunc_t function;
} Limiter_t;

#endif /* LIMITER_TYPES_H */

Вот API планировщика. Механизм очень прост. Limiter измеряет время с момента подачи питания up_time_us, смотрит на расписание следующего запуска start_time_next_us и, если текущее время (up_time_us) больше времени запуска, назначает следующее время запуска и запускает задачу (limiter_task_frame).

bool inline limiter(Limiter_t* const Node, uint32_t period_us, uint64_t up_time_us) {
    bool res = false;
    if(Node->on_off) {
        if(Node->start_time_next_us < up_time_us) {
            Node->start_time_next_us = up_time_us + period_us;
            res = limiter_task_frame(Node);
        }

        if(up_time_us < Node->start_time_prev_us) {
            LOG_DEBUG(LIMITER, "UpTimeOverflow %llu", up_time_us);
            Node->start_time_next_us = up_time_us + period_us;
        }
        Node->start_time_prev_us = up_time_us;
    }
    return res;
}

Limiter также ведёт аналитику. Измеряет время старта и окончания задачи, вычисляет продолжительность исполнения задачи (duration), вычисляет минимум (run_time.min) и максимум (duration.max), суммирует общее время, которое данная задача исполнялась на процессоре (run_time_total).

static inline bool limiter_task_frame(Limiter_t* const Node) {
    bool res = false;
    if(Node) {
        uint64_t start_us = 0;
        uint64_t stop_us = 0;
        uint64_t duration_us = 0;
        uint64_t period_us = 0;

        start_us = limiter_get_time_us();

        if(Node->start_time_prev_us < start_us) {
            period_us = start_us - Node->start_time_prev_us;
            res = true;
        } else {
            period_us = 0; /*(0x1000000U + start) - TASK_ITEM.start_time_prev; */
            res = false;
        }

        Node->start_time_prev_us = start_us;
        if(res) {
           	data_u64_update(&Node->start_period_us, period_us);
        }
        

        res = true;
#ifdef HAS_FLASH
        res = is_flash_addr((uint32_t)Node->function);
#endif /*HAS_FLASH*/
        if(res) {
            Node->call_cnt++;
            res = Node->function();
        } else {
            res = false;
        }

        stop_us = limiter_get_time_us();

        if(start_us < stop_us) {
            duration_us = stop_us - start_us;
            res = true;
            data_u64_update(&Node->duration_us, duration_us);
            Node->run_time_total_us += duration_us;
        } else {
            duration_us = 0;
            res = false;
        }
    }
    return res;
}

Стоит заметить, что перед непосредственным запуском конкретной задачи Limiter может проверить, что указатель на функцию в самом деле принадлежит Nor-Flash памяти микроконтроллера.

В основном супер цикле достаточно только перечислить те задачи, которые будут исполняться. Вот функция одной итерации супер-цикла.


bool inline tasks_proc(uint64_t loop_start_time_us){
    bool res = false;
    uint32_t cnt = task_get_cnt();
    uint32_t t = 0;
    for (t=0; t<cnt; t++) {
        if(TaskInstance[t].limiter.on_off) {
            res = limiter(&TaskInstance[t].limiter, TaskInstance[t].period_us, loop_start_time_us);
        }
    }
    return res;
}

bool super_cycle_iteration(void) {
    bool res = false;
    if(SuperCycle.init) {
        SuperCycle.spin_cnt++;
        res = true;
        SuperCycle.run = true;
        SuperCycle.start_time_us = time_get_us();
        LOG_DEBUG(SUPER_CYCLE, "Proc %f Spin:%u", USEC_2_SEC(SuperCycle.start_time_us),SuperCycle.spin_cnt);
        if(SuperCycle.prev_start_time_us < SuperCycle.start_time_us) {
        	SuperCycle.error++;
        }
      
        SuperCycle.duration_us.cur = (uint32_t)(SuperCycle.start_time_us - SuperCycle.prev_start_time_us);
        SuperCycle.duration_us.min = (uint32_t)MIN(SuperCycle.duration_us.min, SuperCycle.duration_us.cur);
        SuperCycle.duration_us.max = (uint32_t)MAX(SuperCycle.duration_us.max, SuperCycle.duration_us.cur);
       
        super_cycle_check_continuity(&SuperCycle, loop_start_time_us);

        tasks_proc(SuperCycle.start_time_us);
      
        SuperCycle.prev_start_time_us = SuperCycle.start_time_us;
    }

    return res;
}


Вот код запуска супер цикла. Из функции super_cycle_start и будет исполняться вся прошивка, за исключением вызова обработчиков прерываний ISR.

_Noreturn void super_cycle_start(void) {
    LOG_INFO(SUPER_CYCLE, "Start");
    super_cycle_init();

    SuperCycle.start_time_ms = time_get_ms();
    LOG_INFO(SUPER_CYCLE, "Started, UpTime: %u ms", SuperCycle.start_time_ms);
  
    for(;;) {
    	super_cycle_iteration();
    }
}

Такая сформировалась зависимость между программными компонентами данного планировщика.

Отладка планировщика

Очевидно, что надо как-то наблюдать за работой планировщика. Для этого планировщик и были разработан, чтобы снимать метрики. Для этого можно воспользоваться интерфейсом командной строки CLI поверх UART.

В данном скриншоте можно замерить, что больше всего процессорного времени потребляет задача DASHBOARD (приборная панель). Тут же видно, что были такие итерации супер цикла, что задача DASHBOARD непрерывно исполнялась аж 0.33 сек!

Можно измерить период с которым вызывалась каждая из задач и сопоставить с конфигом для каждой задачи. Тут видно, что в среднем реже всего вызывается задача FLASH_FS (менеджер файловой системы). Одновременно драйвер светодиода LED_MONO отрабатывает c частотой (44 Hz). А чаще всего происходит опрос DecaDriver(а).

Тут заметно даже, что накладные расходы на данный планировщик составляют 47.9% по времени. Это лишь потому, что в обработчиках самих задач не происходит пока никакой обработки. Прошивка работает вхолостую. Не было прерываний и флаги не устанавливаются. Не приходят пакетов в интерфейсы. Ничего не происходит.

Также высокое процентное значение SchedulerOverhead это признак того, что периоды у всех задач большие и процессору нечего делать. А значит можно смело добавлять в супер цикл больше кооперативных задач или уменьшать периоды у нынешних задач.

Анализируя эти ценнейшие метрики данного импровизированного планировщика можно принимать решения по оптимизации кода всего проекта. Получился своеобразный Code Coverage.

Достоинства данного планировщика

1--Простота, очевидность, прозрачность, мало кода.

2--Можно вычистить процент загрузки процессора по каждой задаче.

3--Переносимость. Можно его прокручивать хоть на микроконтроллере, хоть на BareBone потоке в RTOS, хоть в консольном приложении на LapTop PC.

4--Приоритет задачи задается периодом её запуска. Чем ниже период, тем выше приоритет.

5--Легко масштабировать прошивку. Просто добавляем новые строчки в super цикл.

6--Можно весь этот планировщик вообще реализовать на функциях препроцессора. И тогда не будет накладных расходов на запуск функций. Однако так будет невозможно осуществлять пошаговую отладку программы.

7--Можно переназначать функции для узлов планировщика и таким образом перепрограммировать устройство далеко в Run-Time.

Недостатки данного планировщика

1--Если одна задача зависла, то считай что зависли все остальные задачи.

2--Надо проектировать задачи так, чтобы они что-то делали за один прогон и не тратили много времени внутри себя. Например переключили состояние конечного автомата и вышли. Совсем не здорово, если какая-то задача начнет расшифровывать 150kByte KeePass файл внутри общего супер цикла или вычислять обратную матрицу 100x100. У Вас перестанет мигать Heart Beat LED, перестанет отвечать CLI и пользователь будет с полной уверенностью считать, что прошивка просто взяла и зависла! А на самом деле программа через 57 секунд снова воспрянет.

3--Требуются накладные расходы (в виде процессорного времени) для вычисления метрик за которыми следит Limiter. Но это не такая и большая проблема, так как отладочные метрики можно включать или исключать на стадии препроцессора #ifdef(ами).

Вывод

Вот и Вы умеете делать кооперативный планировщик. Супер цикл это не такая уж и плохая вещь. Его можно отлично использовать и в RTOS прошивках. Есть код которому точно нужен RTOS. Это BLE/LwIP стек, однако всё остальное: LED, Button может отлично работать в пределах супер цикла в отдельном BareBone потоке. Благодаря супер циклу вы сэкономите на переключении контекста. Надеюсь, что этот текст поможет кому-нибудь писать прошивки и оценивать нагрузку на процессор.

Словарь

Акроним

Расшифровка

1

ISR

Interrupt Service Routine

2

RTOS

real-time operating system

3

UART

Universal asynchronous receiver/transmitter

4

CLI

command-line interface

5

API

Application Programming Interface

Links

Контрольные вопросы:

1--В какую сторону в ARM Cortex-M4 считает Sys Tick таймер?

2--Как измерить загруженность процессора в NoRTOS прошивке?

3--Сколько тактов процессора нужно для вызова Си-функции на микропроцессоре ARM Cortex-M4?

4--Сколько тактов процессора нужно для вызова обработчика прерываний на микропроцессоре ARM Cortex-M4?

Комментарии (25)


  1. MikeKozlovAVR
    16.11.2023 03:15
    +2

    Было дело, я как то писал простого планировщика под ардуино с похожим принципом работы, только задачам при их создании не задавался интервал времени, а задавался только приоритет. Частота вызова задачи зависит от неё самой, то есть от вызова sleep() или yield() и т.д.. Также можно в процессе работы задачи изменить её приоритет. А планировщик раскидывает очередь выполнения задач в зависимости от их состояния и приоритетов.

    https://github.com/MikeKozlovAVR/Arduino_MultiTasker


  1. Zuy
    16.11.2023 03:15
    +6

    FreeRTOS вполне работает на PC на базе родных потоков. Я работал над проектами использующими FreeRTOS, где целевым устройством был микроконтроллер на PowerPC, а отладка так же велась на PC.


    1. aabzel Автор
      16.11.2023 03:15
      +1

      Я работал над проектами использующими FreeRTOS, где целевым устройством был микроконтроллер на PowerPC

      А что это был конкретно за PowerPC микроконтроллер? Случайно не этот

      https://www.st.com/en/automotive-microcontrollers/spc58nn84e7.html

      SPC58NN84E7RMHBR, ядро e200z4, 200Mhz, 32bit, Flash:6576 MByte, SRAM: 128 KByte, QFP 177pins, 3x cores, Arch: Harvard, D-Cache: 8kByte, big endian?


      1. Zuy
        16.11.2023 03:15
        +1

        Что-то из серии MPC57xx. Тогда это еще был Freescale. Чуть позже они же стали NXP.


  1. VladimirFarshatov
    16.11.2023 03:15
    +3

    В свое время делал так: https://community.alexgyver.ru/threads/programmirovanie-konechnyx-avtomatov-bez-delay.2657/

    Ещё где-то был вариант вытесняющей работы на прерывании от watchdog, а не кооперативный.


  1. tminnigaliev
    16.11.2023 03:15
    +4

    Всё-таки думается, что для кооперативного планировщика очень нужна функция yield, которую можно внутри задачи вызвать, чтоб вернуть управление планировщику и уступить процессор другой задаче. Без этого не торт :(


    1. VladimirFarshatov
      16.11.2023 03:15
      +2

      Зачем?

      Если это набор конечных автоматов, которые чем-то там управляют, то включи/выключил, сменил состояние. Всё, работа этого шага завершена. Если это некий сьем показаний, то .. а в общем-то тоже самое - снял, сложил в очередь на обработку и .. всё, ждем активации заново.

      Сложный и долгий числодробильный алгоритм, который надо прервать, передать управление системе? Так опять же это не задача для yield() .. Суть этой функции - вызвать нечто, когда задача не знает что делать (ожидает ввода-вывода, замера и т.д.) но и отдавать управление не хочет, а простаивать - плохо.. Решается реорганизаций архитектуры: разбиваем задачу на отдельные конечные автоматы (они там есть) и работаем с ними как обычно.

      YAGNI.


      1. aabzel Автор
        16.11.2023 03:15

        Подписываюсь под каждым Вашим словом.


  1. abutorin
    16.11.2023 03:15
    +1

    из RTOS для МК есть еще https://github.com/scmrtos/scmrtos


  1. solderman
    16.11.2023 03:15
    +1

    Для еспЭшек удобным оказалось применять библиотеку Ticker (в ардуинах можно неблокирующим миллисом хотя есть и библиотеки, аналогичные тикеру) . Определяя тикером тайминг каждого процесса я выставляю по времени флаг запроса на обслуживание внутри вектора, а обслуживаю процесс и сбрасываю флаг уже в основном цикле по порядку. В общем то можно и очередь с приоритетами организовать но мне это не требуется. Обслуживаемые процессы работают с массивным тепловым оборудованием и высокая частота обслуживания не требуется. Самый частый процесс - сбор данных с датчиков и актуализация сформированных состояний исполнительных устройств - раз в одну секунду.


  1. Barma2012
    16.11.2023 03:15
    +1

    Пожалуйста, добавьте в опрос " Какую RTOS вы использовали при программировании микроконтроллеров?" вариант "Свою самописную".


    1. mlnw
      16.11.2023 03:15
      +1

      Думаю, понятно, что это вариант большинства. Вытесняющая многозадачность, приоритеты, виртуальная память и контексты доступа для МК, выполняющего конкретную узкоспециализированную задачу редко или вообще не нужны. А написать простенький велосипед с очередью задач и планировщиком на прерываниях системного таймера - дело пары часов.


  1. kipar
    16.11.2023 03:15
    +1

    когда для писал для pic16 делал просто главный цикл из которого вызывал всё. Ну да, клавиатуру можно не опрашивать так часто как датчик позиции, но если мы не упираемся в производительность процессора то почему бы и не опросить.

    Но теперь на freertos делаю - намного удобнее писать логику не конечными автоматами а в явном виде. Послать запрос, подождать ответа, если чексумма сошлась обработать, повторить. Из минусов freertos разве что тики не быстрее 1мс, но это легко решается - запускаю таймер на 100мкс, в нем выставляю семафор, а в задаче вместо vTaskDelay делаю ожидание семафора.


    1. AVKinc
      16.11.2023 03:15
      +2

      А вот если опрашивать клавиатуру раз в 200мсек то не нужно отслеживать дребезг. А если функция вызывается через строгое время то этим вполне можно внутри функции пользоваться для отсчета времени ))


      1. DungeonLords
        16.11.2023 03:15
        +2

        Ну и поймаете вместо дребезга (часто меняющегося значения) неверное значение, потому что опрос с периодом 200ms выпал неудачно на время дребезга...


        1. aabzel Автор
          16.11.2023 03:15
          +1

          Тогда надо пропускать отчёты измерений с кнопки через цифровой fir фильтр.


      1. xSVPx
        16.11.2023 03:15
        +2

        0.2с ? Мне казалось я быстрее клавиши нажимаю, т.е. просто будете пропускать нажатия...


        1. Mike-M
          16.11.2023 03:15
          +1

          Видимо, автор допустил опечатку: вместо 200мсек подразумевал 20 мс. Именно такой интервал обычно выбирают для подавления дребезга контактов.


  1. gev
    16.11.2023 03:15
    +2

    Проголсовал за пункт "супер-цикл, который прокручивает конечные автоматы + прерывания"

    В своем планировщике кроме периода выполнения таски добаввил еще и фазу. Так можно таски распределять блолее равномерно, плюс можно легко делать связанные таски, выполняющиеся с равным периодом, но с задержекой друг от друга. Например при опросе датчиков по OneWire, одна таска отпрравляет команду на измерение, а другая через несколько милисекунд считывает показания и обе выполняются раз в несколько секунд.

    PS мой планировщик:

    mkLoop :: SystemClock -> [Task] -> Def ('[] :-> ())
    mkLoop systemClock tasks = proc "loop" $ body $ do
        let (scheduled, immediately) = partition (isJust . period) tasks
        clocks <- replicateM (length scheduled) (local (ival 0))
        forever $ do
            t <- getSystemTime systemClock
            zipWithM_ (run t) clocks scheduled
            mapM_ runTask immediately
        where
            run t1 clock task = do
                t0 <- deref clock
                let Period interval phase = fromJust $ period task
                when (t1 - t0 >=? interval + phase) $ do
                    runTask task
                    store clock $ t1 - phase

    И пример тасков:

    addTask $ delay      15_000       (name <> "_search"             ) $ searchDevices      
    addTask $ delayPhase 15_000 6_000 (name <> "_measure_temperature") $ measureTemperature 
    addTask $ delayPhase 15_000 6_700 (name <> "_get_temperature"    ) $ getTemperature     
    


    1. aabzel Автор
      16.11.2023 03:15

      Гениально! Управлять фазой запуска задач.


      1. gev
        16.11.2023 03:15
        +1

        Это я подсмотрел вот здесь: https://copilot-language.github.io/
        Там алгоритмы пишутся как обработка виртуальных бесконечных потоков изменения состояния системы. И таймер реализуется как раз периодом и фазой


    1. aabzel Автор
      16.11.2023 03:15

      Я так понял, пример кода на Rust. Верно?


      1. gev
        16.11.2023 03:15
        +1

        Это Haskell =)


        1. aabzel Автор
          16.11.2023 03:15

          Я время от времени сравниваю языки программирования.
          Если не сложно, то я был бы признателен за заполнение строчки про Haskell 
          https://docs.google.com/spreadsheets/d/1GJQqpEBGsIMhReNVLeo6LmvwbcFg2ttXLAXgHXlaqjQ/edit#gid=0


          1. gev
            16.11.2023 03:15
            +1

            Заполнил!