Изученая возможности MicroPython для своих целей натолкнулся на одну из реализаций библиотеки asyncio и, после недолгой переписки с Piter Hinch — автором библиотеки, понял, что мне необходимо глубже разобраться с принципами, базовыми понятиями и типичными ошибками использования методов асинхронного программирования. Тем более, что раздел для начинающих — как раз для меня.

Это руководство предназначено для пользователей, имеющих разный уровень опыта работы с asyncio, в том числе содержит специальный раздел для начинающих.

Содержание
0. Введение
0.1.___Установка uasyncio на пустое устройство (hardware)
1. Планирование совместного исполнения программ
1.1.___Модули
2. Библиотека uasyncio
2.1.___Структура программы: цикл обработки событий
2.2.___Сопрограммы
2.2.1.______Постановки в очередь сопрограмм для участия в планировании
2.2.2.______Запуск обратного вызова функции (callback)
2.2.3.______Примечания: сопрограммы как связанные методы. Возвращаемые значения.
2.3.___Задержки
3. Синхронизация и ее классы
3.1.___Блокировка Lock
3.1.1.______Блокировки и тайм-ауты
3.2.___Событие Event
3.2.1.______Значение события
3.3.___Барьер Barrier
3.4.___Семафор Semaphore
3.4.1.______Ограниченный Семафор
3.5.___Очередь Queue
3.6.___Другие Классы синхронизации
4. Разработка классов для asyncio
4.1.___Классы с использованием ожидания await
4.1.1.______Использование в менеджерах контекста
4.1.2.______Await в сопрограмме
4.2.___Асинхронные итераторы
4.3.___Асинхронные менеджеры контекста
5. Исключения от тайм-аутов и из-за отмены задач
5.1.___Исключения
5.2.___Исключения вследствие тайм-аутов и из-за отмены задач
5.2.1.______Отмена задач
5.2.2.______Сопрограммы с таймаутами
6. Взаимодействие с аппаратными устройствами
6.1.___Проблемы синхронизации
6.2.___Опрос устройств с помощью сопрограмм
6.3.___Использование потокового механизма
6.3.1.______Пример драйвера UART
6.4.___Разработка драйвера для потокового (Stream) устройства
6.5.___Полный пример: aremote.py Драйвер для приемника ИК-пульта дистанционного управления.
6.6.___Драйвер для датчика температуры и влажности HTU21D.
7. Советы и подсказки
7.1.___Программа зависает
7.2.___uasyncio сохраняет состояние
7.3.___Сборка мусора
7.4.___Тестирование
7.5.___Распространенная ошибка. Это может быть трудно найти.
7.6.___Программирование с использованием сокетов (sockets)
7.6.1.______Проблемы с WiFi
7.7.___Аргументы конструктора цикла событий
8. Примечания для начинающих
8.1.___Проблема 1: циклы событий
8.2.___Проблема 2: методы блокировки
8.3.___Подход uasyncio
8.4.___Планирование в uasyncio
8.5.___Почему совместное, а не потоковое планирование (_thread)?
8.6.___Взаимодействие
8.7.___Опрос (polling)

0. Введение

Большая часть этого документа предполагает некоторое знакомство с асинхронным программированием. Для новичков введение можно найти в разделе 7.

Библиотека uasyncio для MicroPython включает подмножество asyncio библиотеки Python и предназначена для использования на микроконтроллерах. Как таковая она занимает небольшой объем оперативной памяти и настроена на быстрое переключение контекста с нулевым распределением оперативной памяти.

Этот документ описывает использование uasyncio с акцентом на создание драйверов для аппаратных устройств.

Цель состоит в том, чтобы спроектировать драйверы таким образом, чтобы приложение продолжало работать, пока драйвер ожидает ответа от устройства. При этом приложение остается чувствительным к другим событиям и взаимодействию с пользователем.

Еще одна важная область применения asyncio — сетевое программирование: в Интернете можно найти достаточно информации по этой теме.

Обратите внимание, что MicroPython основан на Python 3.4 с минимальными дополнениями Python 3.5. За исключением случаев, подробно описанных ниже, функции asyncio версий старше 3.4 не поддерживаются. Данный документ определяет функции, поддерживаемые в этом подмножестве.

Цель этого руководства — представить стиль программирования, совместимый с CPython V3.5 и выше.

0.1 Установка uasyncio на пустое устройство (hardware)

Рекомендуется использовать прошивку MicroPython V1.11 или новее. На многих платформах установка не требуется, так как uasyncioо уже скомпилирована в сборке. Для проверки достаточно набрать в REPL

import  uasyncio

Следующие инструкции охватывают случаи, когда модули не установлены предварительно. Модули queues и synchro не являются обязательными, но необходимы для выполнения приводимых здесь примеров.

Устройство, подключенное к интернету

На устройстве, подключенном к Интернету и работающем встроенном программном обеспечении V1.11 или более поздней версии можно выполнить установку с использованием встроенной версии upip. Убедившись, что устройство подключено к вашей сети:

import upip

upip.install ( 'micropython-uasyncio' )  
upip.install ( 'micropython-uasyncio.synchro' )
upip.install ( 'micropython-uasyncio.queues' )

Сообщения об ошибках от upip не слишком полезны. Если вы получили непонятную ошибку, лишний раз проверьте подключение к Интернету.

Аппаратное обеспечение без подключения к интернету (micropip)

Если на устройстве отсутствует подключение к Интернету (например, Pyboard V1.x), проще всего запустить на компьютере установку micropip.py в каталог по вашему выбору, а затем скопировать результирующую структуру каталогов на целевое устройство. Утилита micropip.py работает под Python 3.2 или выше и работает под управлением Linux, Windows и OSX. Подробнее информацию можно найти здесь.

Типичный вызов:

$ micropip.py install -p ~/rats micropython-uasyncio
$ micropip.py install -p ~/rats micropython-uasyncio.synchro
$ micropip.py install -p ~/rats micropython-uasyncio.queues

Устройство без подключения к Интернету (копирование исходников)

Если не использовать micropip.py, файлы должны быть скопированы из источника. В следующих инструкциях описывается копирование минимального количества файлов на целевое устройство, а также случай, когда uasyncio, для уменьшения занимаемого объема, необходимо сжать в скомпилированную сборку в виде байт-кода. Для последней версии, совместимой с официальными прошивками, файлы должны быть скопированы с официального сайта micropython-lib.

Клонируйте библиотеку на компьютер командой

$ git clone https://github.com/micropython/micropython-lib.git

На целевом устройстве создайте uasyncio каталог (необязательно в каталоге lib) и скопируйте в него следующие файлы:

• uasyncio/uasyncio/__init__.py
• uasyncio.core/uasyncio/core.py
• uasyncio.synchro/uasyncio/synchro.py
• uasyncio.queues/uasyncio/queues.py


Эти модули uasyncio могут быть сжаты в байткод путем размещения каталога uasyncio и его содержимое в порт каталога modules и перекомпилировав содержимое.

1. Совместное планирование

Техника совместного исполнения нескольких задач широко используется во встроенных системах, что предлагает меньше накладных расходов, чем потоковое (_thread) планирование, избегая многих ловушек, связанных с действительно асинхронными потоками выполнения.

1.1 Модули

Ниже приведен перечень модулей, которые могут исполняться на целевом устройстве.

Библиотеки

1. asyn.py Обеспечивает примитивы синхронизации Lock, Event, Barrier, Semaphore, BoundedSemaphore, Condition, gather. Обеспечивает поддержку отмены задач через классы NamedTask и Cancellable.

2. aswitch.py Представляет классы для сопряжения переключателей и кнопок, а также программный объект с возможностью повторной задержки. Кнопки представляют собой обобщение переключателей, обеспечивающих логическое, а не физическое состояние, а также события, вызываемые двойным и длительным нажатием.

Демонстрационные программы

Первые две наиболее полезны, так как они дают видимые результаты при доступе к оборудованию Pyboard.

  1. aledflash.py Мигает четырьмя индикаторами Pyboard асинхронно в течение 10 секунд. Простейшая демонстрация uasyncio. Импортируйте ее для запуска.
  2. apoll.py Драйвер устройства для акселерометра Pyboard. Демонстрирует использование сопрограммы для опроса устройства. Работает в течение 20 с. Импортируйте его для запуска. Требуется Pyboard V1.x.
  3. astests.py Тестовые / демонстрационные программы для модуля aswitch.
  4. asyn_demos.py Простые демонстрации отмены задач.
  5. roundrobin.py Демонстрация кругового планирования. Также эталон планирования производительности.
  6. awaitable.py Демонстрация класса с ожиданием. Один из способов реализации драйвера устройства, который опрашивает интерфейс.
  7. chain.py Скопировано из документации по Python. Демонстрация цепочки сопрограмм.
  8. aqtest.py Демонстрация класса Queue библиотеки uasyncio.
  9. aremote.py Пример драйвера устройства для ИК-протокола NEC.
  10. auart.py Демонстрация потокового ввода-вывода через Pyboard UART.
  11. auart_hd.py Использование Pyboard UART для связи с устройством с использованием полудуплексного протокола. Подходит для устройств, например, использующих набор команд модема «AT».
  12. iorw.py Демонстрация драйвера устройства чтения/записи с использованием механизма потокового ввода-вывода.

Тестовые программы

  1. asyntest.py Тесты для классов синхронизации в asyn.py .
  2. cantest.py Тесты отмены задания.

Утилита

1. check_async_code.py Утилита написана на Python3для обнаружения конкретных ошибок кодирования, которые может быть трудно найти. См. раздел 7.5.

Контроль

Каталог benchmarks содержит сценарии для проверки и характеристик планировщика uasyncio.


2. Библиотека uasyncio

Концепция asyncio основана на организации планирования совместного исполнения нескольких задач, которые в этом документе называются сопрограммами.

2.1 Структура программы: цикл событий

Рассмотрим следующий пример:

import  uasyncio  as  asyncio

async  def  bar ():
      count  =  0,
      while  True :
          count  + =  1
          print ( count )
          await  asyncio.sleep ( 1 )   # пауза 1с

loop  =  asyncio.get_event_loop ()
loop.create_task ( bar ()) # Запланировать как можно скорее
loop.run_forever ()

Выполнение программы продолжается до вызова loop.run_forever. На этом этапе выполнение контролируется планировщиком. Строка после loop.run_forever никогда не будет выполнена. Планировщик исполняет код bar потому что он был помещен в очередь планировщика loop.create_task. В этом тривиальном примере есть только одна сопрограмма bar. Если бы были другие, планировщик исполнял бы их в периоды, когда bar была приостановлена.

Большинство встроенных приложений имеют непрерывный цикл обработки событий. Цикл событий также может быть запущен способом, который разрешает завершение, используя метод цикла событий run_until_complete; это в основном используется в тестировании. Примеры можно найти в модуле astests.py.

Экземпляр цикла событий — это одиночный объект, созданный первым вызовом asyncio.get_event_loop() с двумя необязательными целочисленными аргументами, указывающих количество сопрограмм в двух очередях — стартовавшие и ожидающие. Обычно оба аргумента будут иметь одинаковое значение, равное как минимум числу одновременно исполняемых сопрограмм в приложении. Обычно достаточно значение по умолчанию — 16. Если используются значения не по умолчанию, см. Аргументы конструктора цикла событий ( раздел 7.7.).

Если сопрограмме необходимо вызвать метод цикла события (обычно create_task), вызов asyncio.get_event_loop() (без аргументов) эффективно вернет его.

2.2 Сопрограммы

Сопрограмма создается следующим образом:

async def foo ( delay_secs ):
      await  asyncio.sleep ( delay_secs )
      print ( 'Hello' )

Сопрограмма может позволить другим сопрограммам запускаться с помощью await оператора. Сопрограмма должна содержать хотя бы одно await утверждение. Это побуждает сопрограмму выполняться до завершения, прежде чем выполнение перейдет к следующей инструкции. Рассмотрим пример:

await asyncio.sleep ( delay_secs )
await asyncio.sleep ( 0 )

Первая строка заставляет код приостанавливаться на время задержки, а другие сопрограммы используют это время для своего исполнения. Задержка, равная 0, приводит к тому, что все ожидающие сопрограммы планируются к исполнению в циклическом порядке до исполнения следующей строки. Смотрите пример roundrobin.py.

2.2.1. Очередь для планирования сопрограммы

  • EventLoop.create_task Аргумент: Сопрограмма для запуска. Планировщик ставит сопрограмму в очередь для запуска как можно скорее. Вызов create_task немедленно возвращается. Сопрограмма в аргументе указывается с помощью синтаксиса вызова функции с необходимыми аргументами.
  • EventLoop.run_until_complete Аргумент: Сопрограмма для запуска. Планировщик ставит сопрограмму в очередь для запуска как можно скорее. Сопрограмма в аргументе указывается с помощью синтаксиса вызова функции с необходимыми аргументами. Вызов un_until_complete возвращается, когда сопрограмма завершилась: этот метод обеспечивает способ выхода из планировщика.
  • await Аргумент: Сопрограмма для запуска, указанная с помощью синтаксиса вызова функции. Запускает сопрограмму как можно скорее. Ожидающая сопрограмма блокируется до тех пор, пока одна из ожидаемых сопрограмм не завершится.

Выше cказанное совместимо с CPython. Дополнительные методы uasyncio обсуждаются в Примечаниях ( Раздел 2.2.3.).

2.2.2 Запуск функции обратного вызова (callback)

Обратные вызовы должны быть функциями Python, предназначенными для выполнения за короткий промежуток времени. Это связано с тем, что сопрограммы не будут иметь возможность работать в течение всего времени исполнения такой функции.

Следующие методы класса EventLoop используют обратные вызовы:

  1. call_soon — вызвать как можно скорее. Args: callback обратный вызов для запуска, *args любые позиционные аргументы могут следовать через запятую.
  2. call_later — вызвать после задержки в сек. Args: delay, callback, *args
  3. call_later_ms — вызвать после задержки в мс. Args: delay, callback, *args.

loop  =  asyncio.get_event_loop ()
loop.call_soon ( foo , 5 )     # Запланировать обратный вызов 'foo' как можно скорее с аргументом 5.
loop.call_later ( 2 , foo , 5 ) # запуститься через 2 секунды.
loop.call_later_ms ( 50 , foo , 5 ) # запуститься через 50 мс.

loop.run_forever ()

2.2.3 Примечания

Сопрограмма может содержать return инструкцию с произвольными возвращаемыми значениями. Чтобы получить это значение:

result  =  await  my_coro ()

Сопрограмма может быть ограничена методами и должна содержать хотя бы одно await утверждение.

2.3 Задержки

Есть два варианта для организации задержек в сопрограммах. Для более длительных задержек и тех случаев, когда продолжительность не должна быть точной, можно использовать:

async def foo( delay_secs , delay_ms ):
      await  asyncio.sleep ( delay_secs )
      print ( 'Hello' )
      await  asyncio.sleep_ms ( delay_ms )

Во время таких задержек планировщик будет исполнять других сопрограммы. Это может вносить неопределенность по времени, так как вызывающая сопрограмма будет запущена только тогда, когда будет выполнена та, которая выполняется в текущий момент. Величина задержки зависит от разработчика приложения, но, вероятно, будет порядка десятков или сотен мс; это обсуждается далее во Взаимодействии с аппаратными устройствами (Раздел 6).

Очень точные задержки могут быть исполнены с помощью функций utimesleep_ms и sleep_us. Они лучше всего подходят для коротких задержек, так как планировщик не сможет исполнять другие сопрограммы, пока осуществляется задержка.

3.Синхронизация

Часто возникает необходимость обеспечить синхронизацию между сопрограммами. Распространенный пример — избегать так называемых «условий гонки», когда несколько сопрограмм одновременно требуют за доступ к одному ресурсу. Пример приведен в программе astests.py и обсуждается в документации. Другая опасность — «смертельные объятия», когда каждая сопрограмма ждет завершения другой.

В простых приложениях синхронизация может быть достигнута с помощью глобальных флагов или связанных переменных. Более элегантный подход заключается в использовании классов синхронизации. В модуле asyn.py предложены «микро» реализации классов Event, Barrier, Semaphore и Conditios, предназначеных для использования только с asyncio. Они не являются поточно-ориентированными и не должны использоваться с _thread модулем или обработчиком прерываний, если не указано иное. Так же реализован класс Lock, являющийся альтернативой официальной реализации.

Еще одна проблема синхронизации возникает с сопрограммами-производителями и сопрограммами-потребителями. Сопрограмма-производитель генерирует данные, которые использует сопрограмма-потребитель. Для решения подобных задач asyncio предоставляет класс Queue. Сопрограмма-производитель помещает данные в очередь, в то время как сопрограмма-потребитель ожидает его завершения (с другими операциями, запланированными на время). Класс Queue предоставляет гарантии удаления элементов в том порядке, в котором они были получены. В качестве альтернативы можно использовать класс Barrier, если сопрограмма-производитель должна ждать, пока сопрограмма-потребитель не будет готова получить доступ к данным.

Краткий обзор классов приведен ниже. Более подробно в полной документации.

3.1.Блокировка (Lock)

Lock гарантирует уникальный доступ к общему ресурсу. В следующем примере кода создается экземпляр lock класса Lock, который передается всем клиентам, желающим получить доступ к общему ресурсу. Каждая сопрограмма пытается захватить блокировку, приостанавливая выполнение до тех пор, пока не достигнет успеха:

import uasyncio as asyncio
from uasyncio.synchro import Lock

async def task(i, lock):
    while 1:
        await lock.acquire()
        print("Acquired lock in task", i)
        await asyncio.sleep(0.5)
        lock.release()

async def killer():
    await asyncio.sleep(10)

loop = asyncio.get_event_loop()

lock = Lock()  # The global Lock instance

loop.create_task(task(1, lock))
loop.create_task(task(2, lock))
loop.create_task(task(3, lock))

loop.run_until_complete(killer())  # работать 10s

3.1.1.Блокировки (Lock) и таймауты

На момент написания (5 января 2018 г.) официально разработка uasycio класса Lock не закончена. Если для сопрограммы задан тайм-аут(раздел 5.2.2.), в момент ожидания блокировки при срабатывании тайм-аут будет неэффективным. Он не получит TimeoutError пока не получит блокировку. То же самое относится и к отмене задачи.

Модуль asyn.py предлагает класс Lock, который работает в этих ситуациях. Данная реализация класса менее эффективна, чем официальный класс, но поддерживает дополнительные интерфейсы согласно версии CPython, включая использование менеджера контекста.

3.2 Событие (Event)

Event создает возможность одной или нескольким сопрограммам сделать паузу, пока другая не подаст сигнал об их продолжении. Экземпляр Event становится доступными для всех сопрограмм, использующих его:

import  asyn
event  =  asyn.Event ()

Сопрограмма ждет события, объявив await event, после чего выполнение приостанавливается, пока другие сопрограммы не объявят event.set(). Полная информация.

Проблема может возникнуть если event.set() выдается в циклической конструкции; код должен подождать, пока все ожидающие объекты не получат доступ к событию, прежде чем устанавливать его снова. В случае, когда одна coro ожидает событие, это может быть достигнуто путем получения coro-события, очищающего событие:

async  def  eventwait ( event ):
      await event
      event.clear()

Сопрограмма, инициирующая событие, проверяет, что оно было обслужено:

async  def  foo ( event ):
      while  True :          
          # Получать данные откуда-то
          while event.is_set ():
              await  asyncio.sleep ( 1 ) # Подождите, пока coro ответит на 
         event.set ()

В случае, когда несколько coros ждут синхронизации одного события, задачу можно решить с помощью события подтверждения. Каждой coro нужно отдельное событие.

async def eventwait ( событие , ack_event ):
      await  event
      ack_event.set ()

Пример этого приведен в event_test функции в asyntest.py. Это громоздко В большинстве случаев — даже с одним ожидающим coro — класс Barrier, представленный ниже, предлагает более простой подход.
Событие также может обеспечить средство связи между обработчиком прерываний и coro. Обработчик обслуживает аппаратное обеспечение и устанавливает событие, которое проверяется coro уже в обычном режиме.

3.2.1 Значения события

Метод event.set() может принимать необязательное значение данных любого типа. Coro, ожидающая события, может получить его с помощью event.value(). Обратите внимание, что event.clear() будет установлено в значение None. Типичное использование этого для coro, устанавливающего событие — выпустить event.set(utime.ticks_ms()). Любая coro, ожидающая события, может определить возникшую задержку, например, чтобы выполнить компенсацию за это.

3.3.Барьер (Barrier)

Существует два варианта применения класса Barrier.

Во-первых, он может приостановить сопрограмму до тех пор, пока одина или несколько других сопрограмм не завершатся.

Во-вторых, он позволяет нескольким сопрограммам встречаться в определенной точке. Например, производитель и потребитель могут синхронизироваться в точке, где у производителя есть данные, и потребитель готов их использовать. В момент исполнения Barrierможет выполнить дополнительный обратный вызов до того, как барьер будет снят, и все ожидающие события могут продолжаться.

Обратный вызов может быть функцией или сопрограммой. В большинстве приложений, скорее всего, будет использоваться функция: она может быть гарантированно выполнена до завершения, прежде чем барьер будет снят.

Примером является функция barrier_test в asyntest.py. Во фрагменте кода этой программы:

import asyn

def callback(text):
    print(text)

barrier = asyn.Barrier(3, callback, ('Synch',))

async def report():
    for i in range(5):
        print('{} '.format(i), end='')
        await barrier

несколько экземпляров сопрограммы report печатают свой результат и делают паузу, пока другие экземпляры также не будут завершены и ожидают продолжения barrier. В этот момент выполняется обратный вызов. По его завершении возобновляется изначальная сопрограмма.

3.4 Семафор (Semaphore)

Семафор ограничивает число сопрограмм, которые могут получить доступ к ресурсу. Его можно использовать для ограничения количества экземпляров определенной сопрограммы, которые могут работать одновременно. Это выполняется с использованием счетчика доступа, который инициализируется конструктором и уменьшается каждый раз, когда сопрограмма получает семафор.

Самый простой способ использовать его в менеджере контекста:

import asyn

sema = asyn.Semaphore(3)

async def foo(sema):
    async with sema:
        # Здесь ограниченный доступ

Примером является функция semaphore_test в asyntest.py.

3.4.1 (Ограниченный) семафор

Работает аналогично классу Semaphore за исключением того, что, если release метод заставляет счетчик доступа превысить свое начальное значение, a ValueError устанавливается.

3.5 Очередь (Queye)

Класс Queue поддерживается официальной uasycio и пример программы aqtest.py демонстрирует его использование. Очередь создается следующим образом:

from  uasyncio.queues import Queue

q  =  Queue ()

Типичная сопрограмма-производитель может работать следующим образом:

async def producer(q):
    while True:
        result = await slow_process()  # так или иначе получить некоторые данные
        await q.put(result)  # сделать паузу, пока очередь с ограниченным размером не заполнится

и сопрограмма-потребитель может работать следующим образом:

async def consumer(q):
    while True:
        result = await(q.get())  # Остановиться, если q пуста
        print('Result was {}'.format(result))

Класс Queue предоставляет значительные дополнительные функциональные возможности в том случае, когда размер очередей может быть ограничен и может быть опрошен статус. Поведение при пустой очереди (если размер ограничен) и поведение при полной очереди может контролироваться. Документация об этом есть в коде.

3.6 Другие Классы синхронизации

Библиотека asyn.py предоставляет «микро» реализации еще некоторых возможностей CPython.

Класс Condition позволяет сопрограмме уведомлять другие сопрограммы, ождидающие на заблокированном ресурсе. После получения уведомления они получат доступ к ресурсу и снимут блокировку по очереди. Уведомляющая сопрограмма может ограничивать количество сопрограмм, подлежащих уведомлению.

Класс Gather позволяет запустить список сопрограмм. После завершения последней будет возвращен список результатов. Эта «микро» реализация использует другой синтаксис. Тайм-ауты могут быть применены к любой из сопрограмм.

4 Разработка классов для asyncio

В контексте разработки драйверов устройств цель состоит в том, чтобы обеспечить их неблокирующую работу. Сопрограмма-драйвер должна гарантировать, что другие сопрограммы будут исполняться, пока драйвер ожидает исполнения устройством аппаратных операций. Например, задача, ожидающая данных, поступающих в UART, или пользователь, нажимающий кнопку, должен позволять планировать другие события, пока событие не произойдет.

4.1 Классы с использованием ожидания await

Сопрограмма может приостановить выполнение, ожидая объект awaitable. Под CPython пользовательский класс awaitable создается путем реализации специального метода __await__, который возвращает генератор. Класс awaitable используется следующим образом:

import uasyncio as asyncio

class Foo():
    def __await__(self):
        for n in range(5):
            print('__await__ called')
            yield from asyncio.sleep(1) # Другие сопрограммы запланированы здесь
        return 42

    __iter__ = __await__  # См. примечание ниже 

async def bar():
    foo = Foo()  # Foo - awaitable Класс
    print('waiting for foo')
    res = await foo  # Получает результат
    print('done', res)

loop = asyncio.get_event_loop()
loop.run_until_complete(bar())

В настоящее время MicroPython не поддерживает __await__ (проблема # 2678) и для решения должна использоваться __iter__. Строка __iter__ = __await__ обеспечивает переносимость между CPython и MicroPython. Примеры кода смотрите в классах Event, Barrier, Cancellable, Condition в asyn.py.

4.1.1 Использование в контекстных менеджерах

Ожидаемые объекты могут использоваться в синхронных или асинхронных контекстных менеджерах, предоставляя необходимые специальные методы. Синтаксис:

with await awaitable as a:  # Условие 'as' является необязательным
    # код не приводится
async with awaitable as a:  # Асинхронный контекстный менеджер (см.ниже)
    # делать что-нибудь

Для достижения этого __await__ генератор должен вернуть self. Это передается любой переменной в as предложении, а также позволяет работать специальным методам. Посмотрите asyn.Condition и asyntest.condition_test где класс Condition использует await и при этом может использоваться в синхронном контекстном менеджере.

4.1.2 Await в сопрограмме

Язык Python требует, чтобы __await__ было функцией генератора. В MicroPython генераторы и сопрограммы идентичны, поэтому решение заключается в использовании yield from coro(args).

Цель этого руководства — предложить код, переносимый на CPython 3.5 или выше. В CPython генераторы и сопрограммы различны по смыслу. В CPython сопрограмма есть __await__ специальный метод, который извлекает генератор. Это переносимо:

up = False  # Стартуем под MicroPython?
try:
    import uasyncio as asyncio
    up = True  # Или можно использовать sys.implementation.name
except ImportError:
    import asyncio

async def times_two(n):  # Coro в ожидании
    await asyncio.sleep(1)
    return 2 * n

class Foo():
    def __await__(self):
        res = 1
        for n in range(5):
            print('__await__ called')
            if up:  # MicroPython
                res = yield from times_two(res)
            else:  # CPython
                res = yield from times_two(res).__await__()
        return res

    __iter__ = __await__

async def bar():
    foo = Foo()  # foo is awaitable
    print('waiting for foo')
    res = await foo  # Получить значение
    print('done', res)

loop = asyncio.get_event_loop()
loop.run_until_complete(bar())

Обратите внимание, что __await__, yield from asyncio.sleep(1) разрешены CPython. Я еще не понял, как это достигается.

4.2 Асинхронные итераторы

Асинхронные итераторы предоставляют средства для возврата конечной или бесконечной последовательности значений и могут использоваться в качестве средства извлечения последовательных элементов данных, когда они поступают с устройства только для чтения. Асинхронный итератор вызывает асинхронный код в своем методе next. Класс должен соответствовать следующим требованиям:

  • У него есть метод __aiter__, определенный в async def и возвращающий асинхронный итератор.
  • У него есть метод __anext__, который сам является сопрограммой — то есть определен через async def и содержащей, по крайней мере, один оператор await. Чтобы остановить итерацию, она должна вызвать StopAsyncIteration исключение.

Последовательные значения извлекаются c помощью async for как показано ниже:

class AsyncIterable:
    def __init__(self):
        self.data = (1, 2, 3, 4, 5)
        self.index = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        data = await self.fetch_data()
        if data:
            return data
        else:
            raise StopAsyncIteration

    async def fetch_data(self):
        await asyncio.sleep(0.1)  # Другие сопрограммы могут исполняться
        if self.index >= len(self.data):
            return None
        x = self.data[self.index]
        self.index += 1
        return x

async def run():
    ai = AsyncIterable()
    async for x in ai:
        print(x)

4.3 Асинхронные контекстные менеджеры

Классы могут быть разработаны для поддержки асинхронных контекстных менеджеров, имеющих процедуры входа и выхода, которые являются сопрограмами. Примером является Класс Lock, описанный выше. Он имеет сопрограмму __aenter__, которая логически требуется для асинхронной работы. Для поддержки асинхронного протокола контекстного менеджера его __aexit__ метод также должен быть сопрограмой, что достигается путем включения await asyncio.sleep(0). Такие классы доступны изнутри сопрограммы со следующим синтаксисом:

async  def  bar ( lock ):
      async with lock:
          print ( «блокировка bar получена» )

Как и в случае с обычными менеджерами контекста, гарантированно будет вызван метод выхода, когда менеджер контекста завершит работу, как обычно, так и через исключение. Для достижения этой цели используются специальные методы __aenter__и __aexit__ которые должны быть определены, как сопрограммы, ожидающие другую сопрограмму или awaitable объект. Этот пример взят из класса Lock:

 async def __aenter__(self):
        await self.acquire()  # a coro определена с помощью async def
        return self

    async def __aexit__(self, *args):
        self.release()  # Обычный метод
        await asyncio.sleep_ms(0)

Если async with содержит предложение as variable, переменная получает значение, возвращаемое __aenter__.

Для обеспечения корректного поведения прошивка должна быть V1.9.10 или новее.

5.Исключения от тайм-аутов и из-за отмены задач

Эти темы связаны между собой: uasyncio включает отмену задач и применение тайм-аута к задаче, выдавая исключение для задачи особым образом.

5.1 Исключения

Если в сопрограмме возникает исключение (exeption), оно должно быть обработано либо в этой сопрограмме, либо в сопрограмме, ожидающей ее завершения. Это гарантирует, что исключение не распространяется на планировщик. Если исключение произойдет, планировщик прекратит работу, передав исключение в код, который запустил планировщик. Следовательно, чтобы избежать остановки планировщика, сопрограмма, запущенная с loop.create_task() должна перехватывать любые исключения внутри.

Использование throw или close чтобы запустить исключение в сопрограмме, неразумно. Это разрушает uasyncio, заставляя сопрограмму запускаться и, возможно, завершаться, когда он все еще находится в очереди на исполнение.

Приведенный пример иллюстрирует эту ситуацию. Если разрешено работать до конца, он работает как ожидалось.

import uasyncio as asyncio

async def foo():
    await asyncio.sleep(3)
    print('About to throw exception.')
    1/0

async def bar():
    try:
        await foo()
    except ZeroDivisionError:
        print('foo прерывается из-за деления на 0')  # Случилось!
        raise  # обеспечить выход путем распространения цикла.
    except KeyboardInterrupt:
        print('foo was interrupted by ctrl-c')  # НИКОГДА НЕ СЛУЧИТСЯ!
        raise

async def shutdown():
    print('Shutdown is running.')  # Случится в обоих случаях
    await asyncio.sleep(1)
    print('done')

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(bar())
except ZeroDivisionError:
    loop.run_until_complete(shutdown())
except KeyboardInterrupt:
    print('Keyboard interrupt at loop level.')
    loop.run_until_complete(shutdown())

Однако выдача прерывания клавиатуры приводит к тому, что исключение переходит в цикл обработки событий. Это связано с тем, что выполнение uasyncio.sleep передается в цикл обработки событий. Следовательно, приложения, требующие кода очистки в ответ на прерывание клавиатуры, должны перехватывать исключение на уровне цикла событий.

5.2 Отмена и таймауты

Как указывалось выше, эти функции работают, вызывая исключение для задачи особым образом, используя специальный метод MicroPython сопрограмму pend_throw. Как это работает, зависит от версии. В официальной uasyncio v.2.0 исключение не обрабатывается до следующего запланированного задания. Это налагает задержку, если задача ожидает sleep ввода-вывода. Тайм-ауты могут выходить за пределы своего номинального периода. Задача отмены других задач не может определить, когда отмена завершена.

В настоящее время существует обходной путь и два решения.

  • Обходной путь: библиотека asyn предоставляет средства ожидания отмены задач или групп задач. См.Отмена задания (раздел 5.2.1.) .
  • Библиотека Paul Sokolovsky предоставляет uasyncio v2.4, но для этого требуется его прошивка Pycopy.
  • Fast_io библиотека uasyncio решает эту проблему в Python (в менее элегантной манере) и работает под управлением официальной прошивки.

Иерархия исключений, используемая здесь Exception-CancelledError-TimeoutError.

5.2.1 Отмена задания

uasyncio обеспечивает функцию cancel(coro). Это работает, выбрасывая исключение для использования сопрограммы pend_throw. Так же это работает с вложенными сопрограммами. Использование заключается в следующем:

async def foo():
    while True:
        # делать что-то каждые 10 secs
        await asyncio.sleep(10)

async def bar(loop):
    foo_instance = foo()  # Создать вхождение coro
    loop.create_task(foo_instance)
    # code omitted
    asyncio.cancel(foo_instance)

Если этот пример запустить под uasyncio v2.0, то когда bar выдаст cancel он не вступит в силу до следующего запланированного foo и при отмене foo может возникнуть задержка до 10 секунд. Другой источник задержки возникнет, если foo ожидает ввода-вывода. Где бы задержка не возникла, bar не сможет определить, была ли foo отменена. Это имеет значение в некоторых случаях использования.

При использовании библиотек Paul Sokolovsky или fast_io достаточно применить sleep(0):

async def foo():
    while True:
        # делать что-то каждые 10 secs
        await asyncio.sleep(10)

async def bar(loop):
    foo_instance = foo()  # Создать вхождение coro
    loop.create_task(foo_instance)
    # код не приводится
    asyncio.cancel(foo_instance)
    await asyncio.sleep(0)
    # Теперь задача отменена

Это также будет работать в uasyncio v2.0, если foo (и любой ожидающей сопрограммы foo) никогда не выдавал sleep и не ождая ввода / вывода.

Поведение, которое может удивить неосторожного, возникает, когда ожидается отмена сопрограммы, запущенной create_task и находящеяся в режиме ожидания. Рассмотрим этот фрагмент:

async def foo():
    while True:
        # делать что-то каждые 10 secs
        await asyncio.sleep(10)

async def foo_runner(foo_instance):
    await foo_instance
    print('Это не будет напечатано')

async def bar(loop):
    foo_instance = foo()
    loop.create_task(foo_runner(foo_instance))
    # код не приводится
    asyncio.cancel(foo_instance)

Когда foo отменяется, она удаляется из очереди планировщика; потому что в ней отсутствует return инструкция, вызывающая процедура foo_runner никогда не возобновляется. Рекомендуется всегда перехватывать исключение в самой внешней области действия функции, подлежащей отмене:

async def foo():
    try:
        while True:
            await asyncio.sleep(10)
            await my_coro
    except asyncio.CancelledError:
        return

В этом случае my_coro не нужно перехватывать исключение, так как оно будет распространено в вызывающий канал и захвачено там.

Примечание. Запрещается использовать методы close или throw методы сопрограмм, когда сопрограмма используется вне планировщика. Это подрывает планировщик, заставляя сопрограмму выполнять код, даже если он не запланирован. Это может иметь нежелательные последствия.

5.2.2 Сопрограммы с таймаутами

Таймауты реализуются с помощью uasyncio методов .wait_for() и .wait_for_ms(). Они принимают в качестве аргументов сопрограмму и время ожидания в секундах или мс соответственно. Если тайм-аут истекает, TimeoutError будет вброшен в сопрограмму с помощью pend_throw. Это исключение должно быть перехвачено либо пользователем, либо вызывающим абонентом. Это необходимо по причине, описанной выше: если время ожидания истекает, оно отменяется. Если только ошибка не будет перехвачена и не вернётся, единственный путь, по которому вызывающий может продолжить, — это перехват самого исключения.

Там, где исключение перехвачено сопрограммой, у меня возникли неясные сбои, если исключение не перехвачено во внешней области видимости, как показано ниже:

import uasyncio as asyncio

async def forever():
    try:
        print('Starting')
        while True:
            await asyncio.sleep_ms(300)
            print('Got here')
    except asyncio.TimeoutError:
        print('Got timeout')  # And return

async def foo():
    await asyncio.wait_for(forever(), 5)
    await asyncio.sleep(2)

loop = asyncio.get_event_loop()
loop.run_until_complete(foo())

В качестве альтернативы можно перехватить вызывающей функцией:

import uasyncio as asyncio

async def forever():
    print('Starting')
    while True:
        await asyncio.sleep_ms(300)
        print('Got here')

async def foo():
    try:
        await asyncio.wait_for(forever(), 5)
    except asyncio.TimeoutError:
        pass
    print('Timeout elapsed.')
    await asyncio.sleep(2)

loop = asyncio.get_event_loop()
loop.run_until_complete(foo())

Примечение для Uasyncio v2.0.

Это не применимо к библиотекам Paul Sokolovsky или fast_io.

Если сопрограмма запускает await asyncio.sleep(t), с большой задержкой t, сопрограмма не будет перезагружена до истечения t. Если тайм-аут истек до того, как завершится sleep, произойдет TimeoutError при перезагрузке сопрограммы — т.е. когда истечет t. В режиме реального времени и с точки зрения вызывающего абонента, его ответ TimeoutError будет отложен.

Если это важно для приложения, создайте длинную задержку, ожидая короткую в цикле. Сопрограмма asyn.sleep поддерживает это.

6 Взаимодействие с оборудованием

В основе взаимодействия между uasyncio и внешними асинхронными событиями лежат опросы (polling). Аппаратное обеспечение, требующее быстрого ответа, может использовать прерывание. Но взаимодействие между подпрограммой обработки прерываний (ISR) и пользовательской сопрограммой будет основано на опросах. Например, ISR может вызвать Event или установить глобальный флаг, в то время как сопрограмма, ожидающая результата, опрашивает объект каждый раз, когда запрос запланирован.

Опрос может осуществляться двумя способами, явным или неявным. Последнее выполняется с использованием stream I/O механизма, который представляет собой систему, предназначенную для потоковых устройств, таких как UART и сокеты. В самом простом явном опросе может состоять такой код:

async def poll_my_device():
    global my_flag  # Установлен аппаратным ISR
    while True:
        if my_flag:
            my_flag = False
            # service the device
        await asyncio.sleep(0)

Вместо глобального флага можно использовать переменную экземпляра класса Event или экземпляр класса, использующего await. Явный опрос обсуждается ниже.

Неявный опрос состоит из разработки драйвера, который будет работать как потоковое устройство ввода-вывода, такое как UART или сокет stream I/O, который опрашивает устройства, используя систему select.poll Python: поскольку опрос выполняется в C, он быстрее и эффективнее, чем явный опрос. Использование stream I/O обсуждается в разделе 6.3.

Благодаря своей эффективности неявный опрос дает преимущество наиболее быстрым драйверам устройств ввода / вывода: потоковые драйверы могут быть созданы для многих устройств, которые обычно не рассматриваются как потоковые устройства. Более подробно это обсуждается в разделе 6.4.

6.1 Проблемы синхронизации

И явный, и неявный опрос в настоящее время основаны на циклическом планировании. Предположим, что ввод-вывод работает одновременно с N пользовательскими сопрограммами, каждая из которых исполняется с нулевой задержкой. Когда ввод-вывод будет обслужен, он будет затем опрошен, как только все пользовательские операции будут запланированы. Предполагаемая задержка должна учитываться при проектировании. Каналы ввода / вывода могут потребовать буферизации, при этом ISR обслуживает оборудование в режиме реального времени от буферов и сопрограмм, заполняя или освобождая буферы в более медленное время.

Также необходимо учитывать возможность выхода за пределы: это тот случай, когда что-то, опрашиваемое сопрограммой, происходит более одного раза до того, как на самом деле запланирована сопрограммой.

Другой проблемой синхронизации является точность задержек. Если сопрограмма издает

await  asyncio.sleep_ms ( t )      
# следующая строка

планировщик гарантирует, что выполнение будет приостановлено как минимум на t мс. Фактическая задержка может быть больше чем t, что зависит от текущей загрузки системы. Если в это время другие сопрограммы находятся в ожидании завершения ненулевых задержек, следующая строка будет немедленно запланирована к исполнению. Но если другие сопрограммы так же ожидают выполнения (либо потому, что они издали нулевую задержку, либо потому, что их время также истекло), они могут быть запланированы к исполнению раньше. Это вносит неопределенность синхронизации в sleep() и sleep_ms() функции. Значение наихудшего случая для этого переполнения может быть рассчитано путем суммирования значений времени выполнения всех таких сопрограмм для определения наихудшего времени передачи в планировщик.

Fast_io версии uasyncio в этом контексте обеспечивает способ гарантировать, что потоковый ввод/вывод будет опрашиваться на каждой итерации планировщика. Есть надежда, что официальное uasyncio примет соответствующие поправки в свое время.

6.2 Опрос устройств с помощью сопрограмм

Это простой подход, который наиболее подходит для устройств, которые могут опрашиваться с относительно низкой скоростью. Это связано прежде всего с тем, что опрос с коротким (или нулевым) интервалом опроса может привести к тому, что сопрограмма потребляет больше процессорного времени, чем желательно для попадания в интервал.

Пример apoll.py демонстрирует этот подход, опрашивая акселерометр Pyboard с интервалом 100 мс. Он выполняет простую фильтрацию, чтобы игнорировать шум, и печатает сообщение каждые две секунды, если перемещения не происходит.

Пример aswitch.py представляет драйверы для переключателей и кнопочных устройств.

Пример драйвера для устройства, способного считывать и записывать, показан ниже. Для удобства тестирования Pyboard UART 4 эмулирует условное устройство. Драйвер реализует Класс RecordOrientedUart, в котором данные поставляются в записях переменной длины, состоящих из байтовых экземпляров. Объект добавляет разделитель перед отправкой и буферизует входящие данные, пока не будет получен добавленный разделитель. Это лишь демонстрационная версия и неэффективный способ использования UART по сравнению с потоковым вводом/выводом.

В целях демонстрации асинхронной передачи мы предполагаем, что эмулируемое устройство имеет средство проверки того, что передача завершена и что приложение требует, чтобы мы подождали этого. Ни одно из предположений не является верным в этом примере, но код подделывает его путем await asyncio.sleep(0.1).

Для запуска не забудьте соединить выводы Pyboard X1 и X2 (UART Txd и Rxd)

import uasyncio as asyncio
from pyb import UART

class RecordOrientedUart():
    DELIMITER = b'\0'
    def __init__(self):
        self.uart = UART(4, 9600)
        self.data = b''

    def __iter__(self):  # Not __await__ issue #2678
        data = b''
        while not data.endswith(self.DELIMITER):
            yield from asyncio.sleep(0) # Необходимо, потому что:
            while not self.uart.any():
                yield from asyncio.sleep(0) # timing may mean this is never called
            data = b''.join((data, self.uart.read(self.uart.any())))
        self.data = data

    async def send_record(self, data):
        data = b''.join((data, self.DELIMITER))
        self.uart.write(data)
        await self._send_complete()

    # В реальном драйвере устройства мы бы опрашивали аппаратное устройство
    # на предмет завершения в цикле с помощью await asyncio.sleep(0)
    async def _send_complete(self):
        await asyncio.sleep(0.1)

    def read_record(self):  # Synchronous: await the object before calling
        return self.data[0:-1] # Discard delimiter

async def run():
    foo = RecordOrientedUart()
    rx_data = b''
    await foo.send_record(b'A line of text.')
    for _ in range(20):
        await foo  # Другие coros исполняются планировщиком пока мы ожидаем готовности foo
        rx_data = foo.read_record()
        print('Got: {}'.format(rx_data))
        await foo.send_record(rx_data)
        rx_data = b''

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

6.3 Использование потокового механизма (Stream)

В примере демонстрируется одновременный ввод-вывод на одном UART микропроцессора Pyboard.

Для запуска следует соединить выводы Pyboard X1 и X2 (UART Txd и Rxd)

import uasyncio as asyncio
from pyb import UART
uart = UART(4, 9600)

async def sender():
    swriter = asyncio.StreamWriter(uart, {})
    while True:
        await swriter.awrite('Hello uart\n')
        await asyncio.sleep(2)

async def receiver():
    sreader = asyncio.StreamReader(uart)
    while True:
        res = await sreader.readline()
        print('Received', res)

loop = asyncio.get_event_loop()
loop.create_task(sender())
loop.create_task(receiver())
loop.run_forever()

Поддерживающий код можно найти в __init__.py в uasyncio библиотеке. Механизм работает, так как драйвер устройства (написанный на C) реализует следующие методы: ioctl, read, readline и write. Раздел 6.4.Написание драйвера потокового устройства раскрывает подробности о том, как такие драйверы могут быть написаны на Python.

UART может получать данные в любое время. Механизм потокового ввода-вывода проверяет наличие ожидающих входящих символов всякий раз, когда планировщик получает контроль. Когда сопрограмма работает, программа обработки прерываний буферизует входящие символы; они будут удалены, когда сопрограмма уступит время планировщику. Следовательно, приложения UART должны быть спроектированы таким образом, чтобы сопрограммы минимизировали время между передачей в планировщик, чтобы избежать переполнения буфера и потери данных. Это можно улучшить, используя больший буфер чтения UART, либо меньшую скорость передачи данных. В качестве альтернативы аппаратное управление потоком обеспечит решение, если источник данных его поддерживает.

6.3.1 Пример драйвера UART

Программа auart_hd.py иллюстрирует способ связи с полудуплексным устройством, таким как устройство, отвечающее на набор команд модема «AT». Полудуплекс означает, что устройство никогда не отправляет незапрошенные данные: его передачи всегда выполняются в ответ на полученную команду от мастера.

Устройство эмулируется путем запуска тест на Pyboard с двумя проводными связями.

(Очень упрощенное) эмулируемое устройство реагирует на любую команду, посылая четыре строки данных с паузой между каждой, чтобы имитировать медленную обработку.

Мастер отправляет команду, но заранее не знает, сколько строк данных будет возвращено. Он запускает таймер повторного запуска, который перезапускается каждый раз при получении строки. Когда таймер истекает, предполагается, что устройство завершило передачу, и возвращается список принятых строк.

Также продемонстрирован случай отказа устройства, что достигается путем пропуска передачи перед ожиданием ответа. После истечения времени ожидания возвращается пустой список. Смотрите комментарии к коду для более подробной информации.

6.4 Разработка драйвера потокового (Stream) устройства

Механизм потокового ввода/вывода (stream I/O) предназначен для управления работой потоковых устройств ввода/вывода, таких как UART и сокеты(socket). Механизм может использоваться драйверами любого регулярно опрашиваемого устройства путем делегирования планировщику, который использует select, опроса готовности любых устройств в очереди. Это более эффективно, чем выполнение нескольких операций сопрограмм, каждый из которых опрашивает устройство, отчасти потому, что select написано на C, а также потому, что сопрограмма, выполняющая опрос, откладывается до тех пор, пока опрашиваемый объект не вернет состояние готовности.

Драйвер устройства, способный обслуживать механизм потокового ввода/вывода желательно должен поддерживать методы StreamReader, StreamWriter . Читаемое устройство должно обеспечивать как минимум один из следующих методов. Обратите внимание, что это синхронные методы. Метод ioctl (смотри ниже) гарантирует, что они называются только тогда, когда имеются данные. Методы должны возвращаться как можно быстрее, используя столько данных, сколько доступно.

readline() Вернуть столько символов, сколько доступно, вплоть до любого символа новой строки. Требуется, если использовать StreamReader.readline()

read(n) Вернуть столько символов, сколько доступно, но не более n. Требуется, если использовать StreamReader.read() или StreamReader.readexactly()

Создаваемый драйвер должен обеспечить следующий синхронный метод при этом с немедленным возвратом:

write с аргументами buf, off, sz.

Где:

buf — это буфер для записи.
off — смещение в буфер первого символа для записи.
sz — запрашиваемое количество символов для записи.
Возвращаемое значение — количество фактически написанных символов (может быть 1, если устройство работает медленно).
Метод ioctl гарантирует, что будет вызван только тогда, когда устройство будет готово к приему данных.

Все устройства должны обеспечивать метод ioctl, который опрашивает оборудование для определения его состояния готовности. Типичный пример для драйвера чтения/записи:

import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)

class MyIO(io.IOBase):
    # Методы на показаны
    def ioctl(self, req, arg):  # see ports/stm32/uart.c
        ret = MP_STREAM_ERROR
        if req == MP_STREAM_POLL:
            ret = 0
            if arg & MP_STREAM_POLL_RD:
                if hardware_has_at_least_one_char_to_read:
                    ret |= MP_STREAM_POLL_RD
            if arg & MP_STREAM_POLL_WR:
                if hardware_can_accept_at_least_one_write_character:
                    ret |= MP_STREAM_POLL_WR
        return ret

Ниже приведено описание Класса MillisecTimer ожидания задержки:

import uasyncio as asyncio
import utime
import io

MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)

class MillisecTimer(io.IOBase):
    def __init__(self):
        self.end = 0
        self.sreader = asyncio.StreamReader(self)

    def __iter__(self):
        await self.sreader.readline()

    def __call__(self, ms):
        self.end = utime.ticks_add(utime.ticks_ms(), ms)
        return self

    def readline(self):
        return b'\n'

    def ioctl(self, req, arg):
        ret = MP_STREAM_ERROR
        if req == MP_STREAM_POLL:
            ret = 0
            if arg & MP_STREAM_POLL_RD:
                if utime.ticks_diff(utime.ticks_ms(), self.end) >= 0:
                    ret |= MP_STREAM_POLL_RD
        return ret

который может быть использован следующим образом:

async  def  timer_test ( n ):
      timer  =  ms_timer.MillisecTimer ()
      await timer ( 30 )   # Пауза 30 мс

По сравнению с официальной uasyncio подобная реализация не дает никаких преимуществ по сравнению с await asyncio.sleep_ms(). Применение fast_io обеспечивает значительно более точные задержки при обычной схеме использования, когда сопрограммы ожидают нулевую задержку.

Можно использовать планирование ввода / вывода, чтобы связать событие с обратным вызовом. Это более эффективно, чем цикл опроса, потому что выполнение опроса не запланировано, пока ioctl не вернется состояние готовности. Далее выполняется обратный вызов, когда обратный вызов меняет состояние.

import uasyncio as asyncio
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)

class PinCall(io.IOBase):
    def __init__(self, pin, *, cb_rise=None, cbr_args=(), cb_fall=None, cbf_args=()):
        self.pin = pin
        self.cb_rise = cb_rise
        self.cbr_args = cbr_args
        self.cb_fall = cb_fall
        self.cbf_args = cbf_args
        self.pinval = pin.value()
        self.sreader = asyncio.StreamReader(self)
        loop = asyncio.get_event_loop()
        loop.create_task(self.run())

    async def run(self):
        while True:
            await self.sreader.read(1)

    def read(self, _):
        v = self.pinval
        if v and self.cb_rise is not None:
            self.cb_rise(*self.cbr_args)
            return b'\n'
        if not v and self.cb_fall is not None:
            self.cb_fall(*self.cbf_args)
        return b'\n'

    def ioctl(self, req, arg):
        ret = MP_STREAM_ERROR
        if req == MP_STREAM_POLL:
            ret = 0
            if arg & MP_STREAM_POLL_RD:
                v = self.pin.value()
                if v != self.pinval:
                    self.pinval = v
                    ret = MP_STREAM_POLL_RD
        return ret

И вновь — на официальном uasyncio задержка может высокой. В зависимости от дизайна приложения версия fast_io может оказаться более эффективной.

Демонстрационная программа iorw.py иллюстрирует полный пример. Обратите внимание, что на момент написания статьи в официальном uasyncio есть ошибка, из-за которой это не работает. Есть два решения. Обходной путь — написать два отдельных драйвера, один только для чтения, а другой только для записи. Второй — использовать fast_io, который решает эту проблему.

В официальном uasyncio ввод/вывод планируется довольно редко.

6.5 Полный пример: aremote.py

Драйвер предназначен для приема/декодирования сигналов с инфракрасного пульта дистанционного управления. Сам драйвер aremote.py. Следующие примечания являются существенными касательно использования asyncio.

Прерывание на контакте записывает время изменения состояния (в мкс) и устанавливает событие, пропуская время, когда произошло первое изменение состояния. Сопрограмма ожидает события, выдает длительность пакета данных, затем декодирует сохраненные данные перед вызовом указанного пользователем обратного вызова.

Передача времени экземпляру Event позволяет сопрограмме компенсировать любую asyncio задержку при настройке периода задержки.

6.6 HTU21D датчик окружающей среды

Драйвер чипа HTU21D обеспечивает точные измерения температуры и влажности.

Чипу требуется порядка 120 мс, чтобы получить оба элемента данных. Драйвер работает асинхронно, инициируя получение и использование await asyncio.sleep(t) до чтения данных, обновляет переменные temperature и humidity, к которым можно получить доступ в любой момент, что позволяет другим сопрограммам запускаться во время работы драйвера чипа.

7.Советы и подсказки

7.1 Программа зависает

Зависание обычно происходит из-за того, что задача заблокирована без уступки: это приведет к зависанию всей системы. При разработке полезно иметь сопрограмму, которая периодически включает встроенный светодиод. Это обеспечивает подтверждение того, что планировщик еще работает.

7.2 uasyncio сохраняет состояние

При запуске программ, использующих uasyncio в REPL, выполните программный сброс (ctrl-D) между запусками. В связи с тем, что uasyncio сохраняет состояние между запусками, при очередном запуске может происходить непредсказуемое поведению.

7.3 Сборка мусора

Можно исполнить сопрограмму, предварительно указав import gc:

gc.collect ()
gc.treshold ( gc.mem_free () //  4  +  gc.mem_alloc ())

Цель этого обсуждается здесь, в разделе о куче (heap).

7.4 Тестирование

Желательно убедиться в том, что драйвер устройства удерживает контроль, когда это необходимо, что можно сделать, запустив один или несколько экземпляров фиктивных сопрограмм, запускающих цикл печати сообщения и проверяя, что оно выполняется в периоды, когда драйвер находится в режиме ожидания:

async def rr(n):
    while True:
        print('Roundrobin ', n)
        await asyncio.sleep(0)

В качестве примера типа опасности, которая может возникнуть, в приведенном выше примере RecordOrientedUart __await__ метод изначально был записан как:

def __await__(self):
     data = b''
     while not data.endswith(self.DELIMITER):
          while not self.uart.any():
                yield from asyncio.sleep(0)
          data = b''.join((data, self.uart.read(self.uart.any())))
     self.data = data

В результате исполнение растягивается до тех пор, пока не будет получена вся запись, а так же с тем, что uart.any() всегда возвращает ненулевое количество полученных символов. К моменту вызова все символы могут быть уже получены. Такую ситуацию можно разрешить использованием внешнего цикла:

def __await__(self):
        data = b''
        while not data.endswith(self.DELIMITER):
            yield from asyncio.sleep(0) # Необходимо, так как:
            while not self.uart.any():
                yield from asyncio.sleep(0) # возможно по таймингу никогда не будет исполнено
            data = b''.join((data, self.uart.read(self.uart.any())))
        self.data = data

Возможно, стоит отметить, что эта ошибка не была бы очевидной, если бы данные отправлялись в UART с более низкой скоростью, а не с помощью теста с обратной связью. Добро пожаловать в радости программирования в реальном времени.

7.5 Распространенная ошибка

Если функция или метод определены async def и впоследствии вызваны так, как если бы они были обычными (синхронными) вызываемыми, MicroPython не выдает сообщение об ошибке. Это по замыслу. Обычно это приводит к тому, что программа молча не работает правильно:

async def foo():
    # code

loop.create_task(foo)  # Вариант 1 1: foo никогда не будет исполняться
foo()  # Вариант 2: Аналогично.

У меня есть предложение, которое предлагает ситуацию исправить в 1 варианте с помощью fast_io.

Модуль check_async_code.py пытается обнаружить случаи сомнительного использования сопрограмм. Он написан на Python3 и предназначен для работы на ПК. Используется в сценариях, написанных в соответствии с рекомендациями, изложенными в этом руководстве с сопрограммами, объявленными с использованием async def. Модуль принимает один аргумент, путь к исходному файлу MicroPython (или --help).

Обратите внимание, что он несколько грубоват и предназначен для использования в синтаксически правильном файле, который по умолчанию не запускается. Используйте инструмент, например, pylint для общей проверки синтаксиса (в pylint в настоящее время эта ошибка отсутствует).

Сценарий выдает ложные срабатывания. По замыслу сопрограммы являются объектами первого уровня, их можно передавать функциям и хранить их в структурах данных. В зависимости от логики программы можно сохранить функцию или результат ее выполнения. Сценарий не может определить намерение. Он направлен на игнорирование случаев, которые кажутся правильными, при выявлении других случаев для рассмотрения. Предположим foo, где сопрограмма объявлена как async def:

loop.run_until_complete(foo())  # Нет предупреждения
bar(foo)  # Эти строки могут вызвать предупреждение, но могут оказаться и неправильными
bar(foo())
z = (foo,)
z = (foo(),)
foo()  # Получит предупреждение: безусловно неправильное использование.

Я нахожу это полезным как есть, но улучшения всегда приветствуются.

7.6 Программирование с использованием сокетов (sockets)

Существует два основных подхода к программированию сокетов uasyncio. По умолчанию, сокеты блокируются до завершения указанной операции чтения или записи. Uasyncio поддерживает блокировку сокетов, используя select.poll для предотвращения их блокирования планировщиком. В большинстве случаев проще всего использовать именно этот механизм. Пример клиентского и серверного кода можно найти в каталоге client_server. Userver использует приложение select.poll явно опрашивая сокет сервера.

Клиентские сокеты используют его неявно в том смысле, что потоковый механизм uasyncio непосредственно использует его.

Обратите внимание, что socket.getaddrinfo в настоящее время блокируемые. Время в примере кода будет минимальным, но если требуется поиск DNS, период блокировки может быть значительным.

Второй подход к программированию сокетов заключается в использовании неблокирующих сокетов. Это добавляет сложности, но необходимо в некоторых приложениях, особенно если подключение осуществляется через WiFi (см. Ниже).

На момент написания статьи (март 2019 г.) поддержка TLS для неблокирующих сокетов находилась в стадии разработки. Его точный статус неизвестен (мне).

Использование неблокирующих сокетов требует некоторого внимания к деталям. Если неблокирующее чтение выполняется из-за задержки сервера, нет гарантии, что все (или любые) запрошенные данные будут возвращены. Аналогичным образом записи могут не перейти к завершению.

Следовательно, асинхронные методы чтения и записи должны итеративно выполнять неблокирующую операцию, пока требуемые данные не будут прочитаны или записаны. На практике может потребоваться тайм-аут, чтобы справиться с перебоями в работе сервера.
Еще одним осложнением является то, что порт ESP32 имел проблемы, которые требовали довольно неприятных взломов для безошибочной работы. Я не проверял, так ли это до сих пор.
Модуль sock_nonblock.py иллюстрирует требуемые методы. Это не рабочая демонстрация и решения, вероятно, будут зависеть от приложения.

7.6.1 Проблемы с WiFi

Потоковый механизм uasyncio не лучший вариант при обнаружении WiFi отключений. Я счел необходимым использовать неблокирующие сокеты для обеспечения отказоустойчивой работы и переподключения клиента при наличии сбоев.

Этот документ описывает проблемы, с которыми я столкнулся в приложениях WiFi, которые держат сокеты открытыми в течение длительных периодов, и обрисовывает в общих чертах решение.

Pltcm предлагает устойчивый асинхронный MQTT-клиент, который обеспечивает целостность сообщений при сбоях WiFi. Описан простой асинхронный полнодуплексный последовательный канал между беспроводным клиентом и проводным сервером с гарантированной доставкой сообщений.

7.7 Аргументы конструктора цикла событий

Небольшая ошибка может возникнуть, если вам нужно создать цикл обработки событий со значениями, отличающимися от заданных по умолчанию. Такой цикл необходимо декларировать перед запуском любого другого кода с использованием asyncio потому, что эти значения могут потребоваться в этом коде. В противном случае код будет инициализирован значениями по умолчанию:

import uasyncio as asyncio
import some_module

bar = some_module.Bar()  # Конструктор вызывает get_event_loop()
# задавая значения по умолчанию
loop = asyncio.get_event_loop(runq_len=40, waitq_len=40)

Учитывая, что импорт модуля может выполнять код, самый безопасный способ — создать экземпляр цикла событий сразу после импорта uasyncio.

import uasyncio as asyncio

loop = asyncio.get_event_loop(runq_len=40, waitq_len=40)
import some_module
bar = some_module.Bar()  # get_event_loop() вызов теперь безопасен

Я предпочтитаю при написании модулей для использования другими программами избегать запуска кода uasyncio при импорте. Напишите функции и методы для ожидания цикла событий как аргумента. Затем убедитесь, что только приложения верхнего уровня вызывают get_event_loop:

import uasyncio as asyncio
import my_module  # Не запускает код при загрузке

loop = asyncio.get_event_loop(runq_len=40, waitq_len=40)
bar = my_module.Bar(loop)

Этот вопрос обсуждается здесь.

8 заметок для начинающих

Эти заметки предназначены для новичков в асинхронном коде и начинаются с описания проблем, которые стараются решить планировщики, а так же дают обзор подхода uasyncio к решениям.

Раздел 8.5 обсуждает относительные достоинства модулей uasyncio и _thread, а так же почему вы можете предпочесть использование сопрограмм uasyncio упреждающему планированию ( _thread).

8.1 Проблема 1: циклы событий

Типичное приложение прошивки работает непрерывно и при этом должно реагировать на внешние события, которые могут включать в себя изменение напряжения на АЦП, появление аппаратного прерывания, или символа поступившего в UART, или данных доступных в сокете. Эти события происходят асинхронно, и код должен иметь возможность отвечать независимо от того, в каком порядке они происходят. Кроме того, может потребоваться выполнение задач, зависящих от времени, например, мигание светодиодов.

Очевидный способ сделать это с помощью цикла событий uasycio. Этот пример не является практическим кодом, но служит для иллюстрации общей формы цикла событий.

def event_loop():
    led_1_time = 0
    led_1_period = 20
    led_2_time = 0
    led_2_period = 30

    switch_state = switch.state()  # Текущее состояние переключателя

    while True:
        time_now = utime.time()
        if time_now >= led_1_time:  # Мигание LED #1
            led1.toggle()
            led_1_time = time_now + led_1_period
        if time_now >= led_2_time:  # Мигание LED #2
            led2.toggle()
            led_2_time = time_now + led_2_period
        # Можно прицепить еще LEDs

        if switch.value() != switch_state:
            switch_state = switch.value()
            # делать что-нибудь

        if uart.any():
            # обрабатывать ввод с UART

Такой цикл работает для простых примеров, но по мере увеличения количества событий код быстро становятся громоздкими. Они также нарушают принципы объектно-ориентированного программирования, объединяя большую часть программной логики в одном месте, а не связывая код с контролируемым объектом. Мы хотим разработать класс для светодиода, способного мигать, который можно вставить в модуль и импортировать. Подход ООП к миганию светодиода может выглядеть следующим образом:

import pyb

class LED_flashable():
    def __init__(self, led_no):
        self.led = pyb.LED(led_no)

    def flash(self, period):
        while True:
            self.led.toggle()
            # каким-то образом ждать завершения периода period,
            # при этом позволяя другим задачам исполняться одновременно с ожиданием

Планировщик в uasyncio позволяет создавать такие классы.

8.2 Проблема 2: методы блокировки

Предположим, вам нужно прочитать некоторое количество байтов из сокета. Если вы вызываете socket.read(n) с блокирующим сокетом по умолчанию, он будет «блокироваться» (то есть не сможет завершиться), пока не будут получены n байтов. В течение этого периода приложение не будет реагировать на другие события.

С помощью неблокирующего сокета uasyncio вы можете написать метод асинхронного чтения. Задача, требующая данных, будет (обязательно) блокироваться до тех пор, пока они не будут получены, но в течение этого периода будут выполняться другие задачи, что позволит приложению оставаться отзывчивым.

8.3. Подходы uasyncio

В следующем классе предусмотрен светодиод, который можно включать и выключать, а также можно мигать с произвольной скоростью. Экземпляр LED_async использует метод run, который можно использовать для непрерывной работы. Поведением светодиодов можно управлять с помощью методов on(), off() и flash(secs).

import pyb
import uasyncio as asyncio

class LED_async():
    def __init__(self, led_no):
        self.led = pyb.LED(led_no)
        self.rate = 0
        loop = asyncio.get_event_loop()
        loop.create_task(self.run())

    async def run(self):
        while True:
            if self.rate <= 0:
                await asyncio.sleep_ms(200)
            else:
                self.led.toggle()
                await asyncio.sleep_ms(int(500 / self.rate))

    def flash(self, rate):
        self.rate = rate

    def on(self):
        self.led.on()
        self.rate = 0

    def off(self):
        self.led.off()
        self.rate = 0

Следует отметить, что on(), off() и flash() представляют из себя обычные синхронные методы. Они изменяют поведение светодиода, но возвращаются немедленно. Мигание происходит «в фоновом режиме». Это подробно объясняется в следующем разделе.

Класс соответствует принципу ООП, заключающемуся в сохранении логики, связанной с устройством, в классе. При этом, применение uasyncio гарантирует, что во время мигания светодиода приложение может реагировать на другие события. Программа, приведенная ниже, мигает четырьмя светодиодами Pyboard с различной частотой, а также реагирует на кнопку USR, которая ее завершает.

import pyb
import uasyncio as asyncio

from led_async import LED_async  # Класс, описанный выше

async def killer():            # сопрограмма, необходимая для выхода из цикла
    sw = pyb.Switch()
    while not sw.value():
        await asyncio.sleep_ms(100)

leds = [LED_async(n) for n in range(1, 4)]

for n, led in enumerate(leds):
    led.flash(0.7 + n/4)

loop = asyncio.get_event_loop()
loop.run_until_complete(killer())

В отличие от первого примера цикла событий логика, связанная с переключателем, находится в функции, отдельной от функциональности светодиода. Обратите внимание на код, используемый для запуска планировщика:

loop = asyncio.get_event_loop()
loop.run_until_complete(killer())  # Выполнение передается сопрограммам
# Здесь будет продолжение только после завершения killer (),   
# когда планировщик остановился.

8.4 Планирование в uasyncio

Python 3.5 и MicroPython поддерживают понятие асинхронной функции, также известной как сопрограмма или задача. Сопрограмма должна включать хотя бы одно утверждение await.

async def hello():
    for _ in range(10):
        print('Hello world.')
        await asyncio.sleep(1)

Эта функция печатает сообщение десять раз с интервалом в одну секунду. Пока функция приостановлена в ожидании задержки, планировщик asyncio будет исполнять другие задачи, создавая иллюзию их одновременного выполнения.

Когда сопрограмма выдает await asyncio.sleep_ms() или await asyncio.sleep() текущая задача приостанавливается и помещается в очередь, которая упорядочена по времени и выполнение переходит к задаче, находящейся вверху очереди. Очередь спроектирована таким образом, что, даже если указанный спящий режим равен нулю, другие соответствующие задачи будут выполняться до возобновления текущей. Это «честное круговое» планирование. Обычной практикой является выполнение циклов await asyncio.sleep(0), чтобы задача не задерживала выполнение. Далее показан цикл «занято-ожидание», ожидающий другой задачи для установки глобальной переменной flag. Увы, он монополизирует процессор, предотвращая запуск других сопрограмм:

async def bad_code():
    global flag
    while not flag:
        pass                      # Плохо
    flag = False
    # дальнейший код не важен

Проблема здесь в том, что до тех пор, пока цикл flagis False управление не будет передано планировщику, поэтому никакая другая задача не будет запущена. Правильный подход:

async def good_code():
    global flag
    while not flag:
        await asyncio.sleep(0)   # Хорошо
    flag = False
    # дальнейший код не важен

По той же причине плохая практика задавать задержки, например, utime.sleep(1) потому что это блокирует другие задачи на 1 с; правильнее использовать await asyncio.sleep(1).
Обратите внимание, что задержки, формируемые методами uasyncio sleep и sleep_ms в реальности могут превышать указанное время. Это связано с тем, что во время задержки будут выполняться другие задачи. По истечении периода задержки выполнение не возобновится до тех пор, пока запущенная задача не выдаст await или не завершит работу. Хорошо себя ведущая сопрограмма всегда будет декларировать await через регулярные промежутки времени. Там, где требуется точная задержка, особенно если одна меньше нескольких мс, возможно необходимо использовать utime.sleep_us(us).

8.5 Почему совместное, а не потоковое планирование (_thread)?

Первоначальная реакция начинающих на идею совместного планирования сопрограмм часто вызывает разочарование. Наверняка потоковое планирование лучше? Почему я должен явно уступать контроль, если виртуальная машина Python может сделать это для меня?

Когда речь идет о встроенных системах, модель совместной работы имеет два преимущества.
Во-первых, это легкий вес. Возможно иметь большое количество сопрограмм, потому что в отличие от запланированных потоков, приостановленные сопрограммы занимают меньше места.
Во-вторых, это позволяет избежать некоторых тонких проблем, связанных с потоковым планированием.

На практике совместная многозадачность используется широко, особенно в приложениях с пользовательским интерфейсом.

В защиту модели с потоковым планированием покажу одно преимущество: если кто-то пишет

for  x in range ( 1000000 ):
      # сделать что-то трудоёмкое

это не заблокирует другие задачи. В модели совместной работы предполагается, что цикл должен явно давать контроль каждой задаче определенное количество итераций, например, помещая код в сопрограмму и периодически выпуская await asyncio.sleep(0).

Увы, это преимущество бледнеет по сравнению с недостатками. Некоторые из них описаны в документации по написанию обработчиков прерываний. В модели c потоковым планированием каждый поток может прерывать любой другой поток, изменяя данные, которые могут использоваться в других потоках. Как правило, гораздо проще найти и исправить блокировку, возникающую из-за ошибки, которая не дает результата, чем обнаружение иногда очень тонких и редко встречающихся ошибок, которые могут возникнуть в коде, написанном в рамках модели c потоковым планированием.

Проще говоря, если вы напишите сопрограмму MicroPython, вы можете быть уверены, что переменные не будут внезапно изменены другой сопрограммой: ваша сопрограмма имеет полный контроль, пока не выдаст await asyncio.sleep(0).

Имейте в виду, что обработчики прерываний являются преимущественными. Это относится как к аппаратным, так и к программным прерываниям, которые могут произойти в любой точке вашего кода.

Красноречивое обсуждение проблем потокового планирования можно найти здесь.

8.6 Взаимодействие

В нетривиальных приложениях сопрограммы должны взаимодействовать. При этом могут быть использованы обычные методы Python. К ним относится использование глобальных переменных или объявление сопрограмм в качестве методов объекта: они могут совместно использовать переменные экземпляра. В качестве альтернативы изменяемый объект может быть передан в качестве аргумента сопрограммы.

Модель c потоковым планированием требуют от специалистов, чтобы классы обеспечивали безопасную связь; в модели совместной работы это редко требуется.

8.7. Опрос (Polling)

Некоторые аппаратные устройства, такие как акселерометр Pyboard, не поддерживают прерывания, и поэтому должны опрашиваться (то есть периодически проверяться). Опрос также может использоваться в сочетании с обработчиками прерываний: обработчик прерываний обслуживает оборудование и устанавливает флаг. Сопрограмма опрашивает флаг — если он установлен, происходит обработка данных и флаг сбрасывается. Лучшим подходом является использование класса Event.