*иногда хочется уйти от управляемых будней

Корутины — это функции, которые могут приостанавливать своё выполнение и возобновлять его позже, сохраняя своё состояние между вызовами. Это позволяет выполнять несколько задач одновременно без необходимости создания отдельных потоков или процессов.

для привлечения внимания от топ сео
для привлечения внимания от топ сео

И тут начинается обычное бла-бла про как писать енти самые короутины на языке, проблемы и костыли для их решения.

Но нет, цель статьи, копнуть в сторону «как оно там устроено» на уровне близком к железу.  В качестве примера железа возьмем Arduino с 1 потоком.

Начнем издалека, начнем c Duff's Device

Том работал над вопросом как ускорить, казалось бы, простой код.

send(to, from, count)
	register short *to, *from;
	register count;
	{
		do{
			*to = *from++;
        }while(--count>0);
	}

Естественным ходом было думать в сторону снижения количества итераций в while. Потому что… описание всего этого займет много и долго.  В общем, если посмотреть наиболее глупым образом то, в сгенерированном ассемблере слишком много переходов, как раз из-за while.  Надо только заменить цикл от 0 до n строками вида *to = *from++; (запахло векторизацией, но у нас всего-то чип arduino, поэтому забудем это умное слово на время). И тогда Тому пришла гениальная идея, а что, если я просто поменяю сравнения на goto (звуки фу и закатывания глаз). Но используя интересную фичу С компилятора switch case, это как syntax sugar вокруг GOTO, то есть вы свободно и не парясь можете писать код логически разорванный этими переходами и он скомпилируется. И вот что у него вышло:

send(to, from, count)
	register short *to, *from;
	register count;
	{
		register n=(count+7)/8;
		switch(count%8){
		case 0:	do{	*to = *from++;
		case 7:		*to = *from++;
		case 6:		*to = *from++;
		case 5:		*to = *from++;
		case 4:		*to = *from++;
		case 3:		*to = *from++;
		case 2:		*to = *from++;
		case 1:		*to = *from++;
			}while(--n>0);
		}
	}

Как он сам написал: It amazes me that after 10 years of writing C there are still little corners that I haven't explored fully.  (Actually, I have another revolting way to use switches to implement interrupt driven state machines but it's too horrid to go into.) (думаю перевод не нужен)

Итого он значительно снизил количество проверок и переходов, и смог разогнать код. Кроме того, это код показателен для управления состояниями.

Реализация короутин с простейшим Round Robin

Давайте рассмотрим реализацию короутин с простейшим Round Robin. За малым исключением мы не будем усложнять дело, управление квантованием времени и т.п. откинем, а возьмем только: Циклический порядок (задачи обрабатываются в порядке их поступления и без возможности прерывания без спец синтаксиса. После завершения времени выполнения задачи, она помещается в конец очереди, и процессор переходит к следующей задаче).

Ниже будем для простоты называть такое потоками.

Возвращаемся к коду машины Duff-а, пусть у нас есть 2 потока. 1 поток пишет "привет" и ждет пока второй поток поставит переменную global на 1. А второй ждет, когда tickcount увеличиться на 50 и поставит global равным 1.

Попробуем это сделать на псевдо С.

  1. нам нужно как-то понимать куда мы должны возвращаться. У нас есть уникальные ... номера строк.

  2. нам нужно где-то это хранить. Пускай будет структура вида

struct context { unsigned short line_number; };
enum { WAITING, YIELDED, EXITED };

Тогда поток будет выглядеть так:

int thread1(struct context * context) {
    switch(context -> line_number) {
        case 0:;
            printf("Привет\n");
            context -> line_number  = __LINE__;  // Запоминаем текущую строку
            case __LINE__: //при следующем заходе switch сможет перекинуть сюда
            if (global !=1){  // Проверяем условие
                return WAITING;  // Возвращаемся, если условие не выполнено
            }
            printf("Мир\n");
}//закрываем switch 
            context -> line_number // Обнуляем line_number в конце
 return EXITED;
}

Поток 2:

int thread2(struct context * contex, int *start_tick_count) {
    switch(context -> line_number) {
        case 0:;
            context -> line_number  = __LINE__;  // Запоминаем текущую строку
            case __LINE__: //при следующем заходе switch сможет перекинуть сюда
            printf("waiting");
            if ((current_ticks() - *start_tick_count) < 50){  // Проверяем условие
                return WAITING;  // Возвращаемся, если условие не выполнено
            }
    }//закрываем switch 
    context -> line_number // Обнуляем line_number в конце
    return EXITED;
}

И наш супер продвинутый round robin scheduler:

int global = 0;
int start_ticks=0;
void main(…){
  context c1;
  context c2;
  start_ticks = current_ticks();
  while(true){
  	thread1(c1);
    thread2(c2, & start_ticks)
  }
}

Результат работы будет “привет”, несколько раз “waiting” и “мир”

Как это работает: внутри функций, перед выходом return WAITING; сохраняется состояние, куда нужно вернутся. При следующем вызове switch просто перекинет на нужный нам case LINE и продолжит выполнение.

Осталось обернуть это все в макросы, и мы изобрели простые легковесные короутины которые подойдут даже для arduino. Но… за нас это все уже сделали.

Protothreads

https://dunkels.com/adam/pt/ великолепный по своей простоте проект.

Давайте попробуем написать немного кода для Arduino, как если бы это было многопоточной системой.

Идея простая: крутим колеса пока датчик сонара не покажет, что расстояние меньше 25 см. тогда пытаемся повернуть пока расстояние не увеличится.

Итого нам нужно 3 короутины

      1. Читает датчик сонар расстояние

static PT_THREAD(read_distance(struct pt *pt)) {
    PT_BEGIN(pt);

    while(1) {
        int currentDistance = sonarForward.ping_cm();
        if(currentDistance == 0) {
            currentDistance = 100;
        }
#ifdef DEBUG_DISTANCE
        printDistance(currentDistance);
#endif


        distance = currentDistance;
        PT_SLEEP(pt, 100);
    }

    PT_END(pt);
}
  1. Двигается вперед пока расстояние до обьекта больше 25 см

static PT_THREAD(move_forward(struct pt *pt)) {
    PT_BEGIN(pt);

    while(1) {
        PT_WAIT_UNTIL(pt, distance > MIN_DISTANCE_FORWARD);
        printText("forward");
#ifndef EMULATION
        motor1.run(FORWARD);
        motor2.run(FORWARD);
        motor1.setSpeed(200);
        motor2.setSpeed(200);
#endif
        PT_SLEEP(pt, 100);
    }

    PT_END(pt);
}
  1. Разворачивает если расстояние меньше 25 см

static PT_THREAD(try_rotate(struct pt *pt)) {
    PT_BEGIN(pt);

    while(1) {
        PT_WAIT_UNTIL(pt, distance < MIN_DISTANCE_FORWARD);
        printText("rotate");
#ifndef EMULATION
        motor1.run(BACKWARD);
        motor2.run(FORWARD);
        motor1.setSpeed(150);
        motor2.setSpeed(150);
#endif
        PT_SLEEP(pt, 100);
    }

    PT_END(pt);
}

Полный код примера

Для переносимости решения, например, на phreads можно обернуть вызовы в похожие макросы, для примера:

#define PT_THREAD(name) void *name(void *arg)

#define PT_WAIT_UNTIL(pt, condition) \
    do { \
        if (!(condition)) { \
            (pt)->condition = false; \
            pthread_cond_wait(&(pt)->cond, &(pt)->mutex); \
        } \
    } while (0)

И тогда даже получится переключатся между режимами работы из короутин в многопоточную… но это уже совсем другая история.

*иногда хочется уйти от управляемых будней
*иногда хочется уйти от управляемых будней

Комментарии (7)


  1. Zolg
    06.09.2024 20:16
    +1

    имхо, если цель

    копнуть в сторону «как оно там устроено» на уровне близком к железу

    то нужно копать не в сторону async/await и прочей новомодной "асинхронности" а в сторону старой доброй cooperative multitasking. Ибо под копотом по сути (а для однопроцессорных/ядерных систем и по факту) она и есть.


    1. aamonster
      06.09.2024 20:16
      +2

      Классическая кооперативная многозадачность требует сохранения стека, подход со стейт машиной, как в PT, ощутимо дешевле (за что приходится платить ручным управлением контекстами).


    1. kain64b Автор
      06.09.2024 20:16

      Думал про это, но uno даёт слишком мало ресурсов. Да и цель была переписать код робота на что-то понятное, понятное после дня работы java/c#. А так да, вы правы. Надо попробовать.


  1. kenomimi
    06.09.2024 20:16
    +8

    И эти люди запрещают мне писать goto...
    И эти люди запрещают мне писать goto...


  1. aamonster
    06.09.2024 20:16
    +1

    Как только вы всё состояние "треда" ручками вынесли в отдельный объект – остальное уже халява.

    В этом плане очень интересно сравнить три разных подхода: полноценные треды (неважно, с кооперативной многозадачностью или вытесняющей: основное – отдельный стек и сохранение состояния CPU при переключении), реализацию async/await в C# (мощная поддержка от компилятора, выделяющая состояние и преобразующая функции в state-машину) и реализацию промисов в js с мощным использованием замыканий.

    Понятно, что варианты C# и JS на ардуино не подойдут, так как сильно опираются на автоматическое управление памятью, но понимать здОрово.

    ЗЫ: Ещё есть подход Swift, но я его пока так и не понял.


  1. kovserg
    06.09.2024 20:16
    +1

    Для arduino же подход гораздо проще чем корутины: setup / loop
    Первая функция вызывается при инициализации, а вторая "постоянно" либо с постоянной частотой (например 1000 раз в сек как в realtime контроллерах). Для постепенно исполняемых функций надо примерно 16 строк:

    loop-fn.h
    #ifndef __LOOP_FN_H__
    #define __LOOP_FN_H__
    
    typedef int loop_t;
    
    #define LOOP_RESET(loop) { loop=0; }
    #if defined(__COUNTER__) && __COUNTER__!=__COUNTER__
    #define LOOP_BEGIN(loop) { enum { __loop_base=__COUNTER__ }; \
        loop_t *__loop=&(loop); __loop_switch: \
        switch(*__loop) { default: *__loop=0; case 0: {
    #define LOOP_POINT() { enum { __loop_case=__COUNTER__-__loop_base }; \
        *__loop=__loop_case; goto __loop_leave; case __loop_case:{} }
    #else
    #define LOOP_BEGIN(loop) { loop_t *__loop=&(loop); __loop_switch: \
        switch(*__loop){ default: case 0: *__loop=__LINE__; case __LINE__:{
    #define LOOP_POINT() { *__loop=__LINE__; goto __loop_leave; case __LINE__:{} }
    #endif
    #define LOOP_END() { __loop_end: *__loop=-1; case -1: return 0; } \
        }} __loop_leave: return 1; }
    
    #endif
    
    loop-fn-example.c
    #include <stdio.h>
    #include "loop-fn.h"
    
    typedef struct fn1_s {
        loop_t loop;
        int i;
    } fn1_t;
    int fn1_setup(fn1_t *self); /* return 0 if no error */
    int fn1_loop(fn1_t *self); /* return 0 when done */
    
    int fn1_setup(fn1_t *self) {
        LOOP_RESET(self->loop);
        return 0;
    }
    
    int fn1_loop(fn1_t *self) {
        LOOP_BEGIN(self->loop)
        printf("fn1.1\n");
        LOOP_POINT()
        for(self->i=0;self->i<3;self->i++) {
            printf("fn1.2.%d\n",self->i+1);
            LOOP_POINT()
        }
        printf("fn1.3\n");
        LOOP_END()
    }
    
    void test() {
        fn1_t s[1]; int i,r;
        fn1_setup(s);
        for(i=0;i<10;i++) {
            printf("%2d: ",(int)s->loop);
            r=fn1_loop(s); if (r==0) break;
        }
    }
    
    int main(int argc, char** argv) {
        test();
        return 0;
    }
    /* output:
     0: fn1.1
     1: fn1.2.1
     2: fn1.2.2
     2: fn1.2.3
     2: fn1.3
    */
    

    Основные проблемы начинаются дальше, т.к. помимо самой многозадачности надо поддерживать некоторые правила. Без которых будет всё очень печально. И самое главное, что за выделение ресурсов должен отвечать не исполняющий код, а тот кто это код запустил исполняться. То есть если подзадача остановилась, есть возможность освободить все ресурсы которые были выделены в том числе и дочерние подзадачи. То есть структурная многозадачность должна быть как в erlang. Если исполнителя надо прибить или ограничить в ресурсах то это не вызывает UB или головной боли. Из плюсов простота и есть полный контроль над происходящим и можно сохранять состояние на диск или передавать по сети. Никакие корутины таким похвастаться не могут. И еще если потоков миллионы то задержки и ожидания делаются более хитрым способом.


    1. AVKinc
      06.09.2024 20:16

      Надо писать так, что бы задача не могла остановиться, в случае шухера она должна вываливаться с ошибкой вот и все. Это же не ядро линукса, можно так написать. И соответственно надо иметь диспетчер запуска, глупо запускать все задачи в лупе. Что то запускается раз в 100мс, что то раз в минуту. Это элементарные правила кодинга под контроллеры.