Примечание

Это перевод статьи об asyncio.Queue. Позже планирую дополнить тему статьями про asyncio.LifoQueue и asyncio.PriorityQueue. Я постарался дополнить перевод и исправить ошибки - в оригинале они были, но возможно что-то пропустил. Если вы нашли ошибку, пожалуйста, используйте Ctrl+Enter и я исправлю. Спасибо!

При написании приложений для обработки событий или данных других типов нам часто бывает нужен механизм для хранения событий и распределения их между несколькими исполнителями. Затем исполнители могут конкурентно сделать с этими событиями все, что нужно; это позволяет сэкономить немного времени по сравнению с последовательной обработкой событий.

Библиотека asyncio предоставляет для этой цели реализацию асинхронной очереди: asyncio.Queue для очереди типа FIFO, asyncio.PriorityQueue для приоритетных очередей и asyncio.LifoQueue для очереди типа LIFO.

Мы можем добавить в очередь данные и запустить несколько конкурентных исполнителей, которые будут извлекать данные из очереди и обрабатывать их по мере готовности. Такую схему работы часто называют производитель–потребитель(паттерн producer-consumer). Одна сторона порождает данные или события, которые надлежит обработать; их обработка может занять длительное время. Очередь позволяет делегировать длительные операции фоновым задачам, сохранив отзывчивость пользовательского интерфейса. Мы помещаем элемент в очередь для последующей обработки и информируем пользователя, что работа началась в фоне.

У асинхронных очередей есть и дополнительное преимущество – они дают механизм ограничения конкурентности, поскольку обычно с очередью работает конечное число задач-исполнителей.

Из книги "Asyncio и конкурентное программирование на Python" Мэттью Фаулера

В этом уроке вы узнаете, как обмениваться данными между корутинами с помощью asyncio.Queue в Python.

Что такое очередь?

asyncio.Queue предоставляет FIFO-очередь для использования с корутинами, но прежде чем мы погрузимся в детали работы asyncio.Queue, давайте рассмотрим очереди в общем виде в Python.

Queue

Queue(очередь) - это структура данных, в которую элементы могут быть добавлены вызовом put() и извлечены вызовом метода get(). Python предоставляет потокобезопасную очередь с помощью класса queue.Queue, который позволяет потокам обмениваться объектами друг с другом.

Модуль queue реализует очереди с несколькими производителями( multi-producer) и потребителями(multi-consumer). Он особенно полезен в потоковом программировании, когда требуется безопасно обмениваться информацией между несколькими потоками. Класс Queue этого модуля реализует всю необходимую семантику блокировки.

Queue – A synchronized queue class

Безопасная для процессов очередь обеспечивается с помощью класса multiprocessing.Queue. Эта очередь является одновременно потокобезопасной и процессобезопасной и позволяет процессам совместно использовать данные Python-объектов.

Обе очереди управляют данными в порядке очереди FIFO, что означает, что элементы извлекаются из очереди в том порядке, в котором они были добавлены. Первые элементы, добавленные в очередь, будут получены первыми. Это противоположно другим типам очередей, таким как очередь "последний в очереди" и "первый из очереди", а также приоритетным очередям.

Далее рассмотрим очередь asyncio.

Асинхронная очередь asyncio.Queue

В Python асинхронные очереди - это тип структуры данных, в которой элементы могут храниться и извлекаться в порядке поступления (FIFO). Они предназначены для использования с корутинами - функциями, которые могут быть приостановлены и возобновлены асинхронно. Для создания очередей asyncio необходимо использовать класс asyncio.Queue.

Очередь asyncio.Queue не является ни потокобезопасной, ни процессоробезопасной. Это означает, что она не может использоваться потоками или процессами для совместного использования объектов Python и предназначена только для использования корутинами в рамках одного потока Python, например, одного цикла событий.

asyncio.Queue предназначен для использования в asyncio-программах в виде корутин? поэтому некоторые методы этого класса фактически являются корутинами и должны использовать вместе await. Это означает, что asyncio.Queue не может быть использован вне asyncio-программы.

Когда это может быть полезно?

Очереди в asyncio могут быть использованы для:

  • Реализация веб-краулеров, которые параллельно получают и анализируют веб-страницы.

  • Построение брокеров сообщений, распределяющих задачи между несколькими рабочими.

  • Разработка чат-приложений, которые асинхронно отправляют и получают сообщения.

  • Напишите свой вариант в комментарии)

Методы класса asyncio.Queue
  • await queue.put(item) - для помещения элемента в очередь. Если очередь переполнена, этот метод будет ждать, пока не освободится свободный слот.

  • await queue.get() - для получения элемента из очереди. Если очередь пуста, этот метод будет ждать, пока элемент не станет доступен.

  • queue.task_done() - для индикации того, что ранее полученный элемент был обработан. Этот метод должен вызываться потребительскими корутинами после завершения работы с элементом.

  • await queue.join() - для блокировки обработки всех элементов в очереди. Этот метод должен вызываться корутинами-производителями после того, как они завершат занесение элементов в очередь.

    Можно также использовать некоторые "некорутинные" методы очереди, например:

  • queue.put_nowait(item) - чтобы поместить элемент в очередь без блокировки. Если очередь переполнена, этот метод вызовет исключение QueueFull.

  • queue.get_nowait() - для получения элемента из очереди без блокировки. Если очередь пуста, то этот метод вызовет исключение QueueEmpty.

  • queue.qsize() - для получения количества элементов в очереди.

  • queue.empty() - для проверки, пуста ли очередь.

  • queue.full() - для проверки заполнения очереди.

Как использовать asyncio.Queue

В этом разделе мы рассмотрим примеры использования класс asyncio.Queue, включая создание и настройку экземпляра, добавление и удаление элементов, запросы к свойствам очереди и управление задачами.

Создание очереди asyncio.Queue

Мы можем создать asyncio.Queue, создав экземпляр этого класса. По умолчанию очередь не будет ограничена по вместимости. Например:

# create a queue with no size limit
queue = asyncio.Queue()

asyncio.Queue принимает один аргумент конструктора - "maxsize", который по умолчанию равен нулю (без ограничений). Например:

# создание очереди без ограничения размера очереди
queue = asyncio.Queue(maxsize=0)

Мы можем установить ограничение на размер очереди. Эффект ограничения размера означает, что когда очередь заполнена и корутины пытаются добавить объект, они блокируются до тех пор, пока не освободится место, или терпят неудачу, если используется неблокирующий метод. Поскольку аргумент "maxsize" является первым позиционным аргументом, не обязательно указывать его по имени. Например:

# создание очереди с ограничением по размеру
queue = asyncio.Queue(maxsize=100)
queue2 = asyncio.Queue(100)

Добавление элементов в очередь asyncio.Queue

Объекты Python могут быть добавлены в очередь с помощью метода put(). На самом деле это корутин, который необходимо ожидать. Причина заключается в том, что вызывающая корутина может заблокироваться, если очередь переполнена. Например:

# добавить объект в очередь
await queue.put(item)

Элемент также может быть добавлен в очередь без блокировки с помощью метода put_nowait(). Этот метод не является корутиной и либо добавляет элемент, либо возвращается немедленно, либо выдает исключение asyncio.QueueFull, если очередь переполнена и элемент не может быть добавлен.

try:
	# попытка добавить элемент
	queue.put_nowait(item)
except asyncio.QueueFull:
	# ...

Получение элементов из очереди asyncio.Queue

Элементы могут быть получены из очереди путем вызова метода get(). На самом деле это корутин, который необходимо ожидать(await). Причина заключается в том, что в очереди может не быть элементов для получения в данный момент, и вызывающая корутина должна блокироваться до тех пор, пока элемент не станет доступным.

# извлечение элемента из очереди
item = await queue.get()

Извлеченный элемент будет самым старым из добавленных, при упорядочении FIFO. Элемент может быть получен из очереди без блокировки с помощью метода get_nowait(). Этот метод не является корутиной и вернет элемент немедленно, если он доступен, в противном случае произойдет сбой с исключением asyncio.QueueEmpty. Например:

try:
	# попытка извлечь элемент
	item = queue.get_nowait()
except asyncio.QueueEmpty:
	# ...

Запрос размера очереди asyncio.Queue

Мы можем получить фиксированный размер очереди через свойство "maxsize". Например:

# сообщить размер очереди
print(queue.maxsize)

Проверить, пуста ли очередь, можно с помощью функции empty(), которая возвращает True, если очередь не содержит элементов, и False в противном случае. Например:

# проверка что очередь пуста
if queue.empty():
	# ...

Мы также можем проверить, заполнена ли очередь, с помощью метода full(), который возвращает True, если очередь заполнена, и False в противном случае. Например:

# проверка заполнение очереди
if queue.full():
	# ...

Методы join() и task_done() в asyncio.Queue

Элементы в очереди могут рассматриваться как задачи, которые могут быть помечены потребительскими(consumer) корутинами как процессы.

Этого можно добиться, если потребительские корутины будут извлекать элементы из очереди с помощью get() или get_nowait() и после обработки помечать их методом task_done(). Например:

# извлечение элемента из очереди
item = await queue.get()
# process the item
# ...
# отметить элемент как процесс
queue.task_done()

Другие программы могут быть заинтересованы в том, чтобы узнать, когда все элементы, добавленные в очередь, будут получены и помечены как выполненные. Этого можно добиться, если программа ожидает выполнения программы join() в очереди.

Корутина join() не вернется, пока все элементы, добавленные в очередь до ее вызова, не будут помечены как выполненные.

Например:

# дождаться, пока все элементы очереди будут помечены как выполненные
await queue.join()

Если очередь пуста или все элементы уже помечены как выполненные, то корутина join() возвращается немедленно. Теперь, когда мы знаем, как использовать asyncio.Queue, давайте рассмотрим несколько работающих примеров.

Пример использования asyncio.Queue

Рассмотрим использование класса asyncio.Queue на рабочем примере. В этом примере мы создадим корутину-производителя, которая будет генерировать десять случайных чисел и помещать их в очередь. Также будет создана корутина-потребитель, которая будет получать числа из очереди и сообщать их значения. asyncio.Queue обеспечивает возможность обмена данными между этими производящей и потребительской корутинами.

  1. Во-первых, мы определим функцию, которая будет выполняться корутиной-производителем. Задача будет выполнять десять итераций в цикле. На каждой итерации будет генерироваться новое случайное значение от 0 до 1 с помощью функции random.random(). Затем она будет спать в течение доли секунды для имитации работы, после чего поместит значение в очередь. После завершения задачи она поместит в очередь значение None, чтобы сигнализировать корутине-потребителю об отсутствии дальнейшей работы. Приведенная ниже программа producer() реализует это, принимая в качестве аргумента экземпляр очереди.

  2. Далее мы можем определить функцию, которую будет выполнять корутину-потребитель. Задача будет циклически повторяться. На каждой итерации она будет получать элемент из очереди и блокировать его, если он еще не доступен. Если элемент, полученный из очереди, имеет значение None, то задача прервет цикл и завершит работу программы. В противном случае значение сообщается. Приведенная ниже корутина consumer() реализует это и принимает в качестве аргумента экземпляр очереди.

  3. Наконец, в основной программе мы можем создать экземпляр общей очереди. Затем мы можем создать и запустить корутины производителя и потребителя и дождаться их завершения с помощью метода asyncio.gather()

# example of using an asyncio queue
from random import random
import asyncio
 
#1 корутина-производитель
async def producer(queue):
    print('Producer: Running')
    # generate work
    for i in range(10):
        # generate a value
        value = random()
        # блокировка для симуляции работы
        await asyncio.sleep(value)
        # добавление в очередь
        await queue.put(value)
    # отправление сигнал о готовности
    await queue.put(None)
    print('Producer: Done')
 
#2 корутина потребитель
async def consumer(queue):
    print('Consumer: Running')
    # consume work
    while True:
        # получить единицу работы
        item = await queue.get()
        # проверка наличия сигнала останова
        if item is None:
            break
        # report
        print(f'>got {item}')
    # all done
    print('Consumer: Done')
 
# точка входа корутины
async def main():
    # создание общей очереди
    queue = asyncio.Queue()
    # управлять производителем и потребителями
    await asyncio.gather(producer(queue), consumer(queue))
 
# запуск программы
asyncio.run(main())

При запуске примера сначала создается общий экземпляр asyncio.Queue. Затем создается корутина producer, которому передается экземпляр очереди. Затем запускается корутина-потребитель, а главная корутина main() блокируется до тех пор, пока не завершатся обе корутины. Кораутин-производитель генерирует новое случайное значение для каждой итерации задачи, блокирует и добавляет его в очередь. Кораутин-потребитель ожидает поступления элементов в очередь, затем потребляет их по одному, сообщая их значение. Наконец, задача-производитель завершается, в очередь помещается значение None, и работа программы завершается. Кораутин-потребитель получает значение None, прерывает свой цикл и также завершает работу.

Это наглядно показывает, как можно использовать asyncio.Queue для удобного обмена данными между "производящими" и "потребительскими" корутинами. Обратите внимание, что вывод программы будет отличаться при каждом запуске, поскольку в ней используются случайные числа.

Вывод программы
Producer: Running
Consumer: Running
>got 0.7559246569022605
>got 0.965203750033905
>got 0.49834912260024233
>got 0.22783211775499135
>got 0.07775542407106295
>got 0.5997647474647314
>got 0.7236540952500915
>got 0.7956407178426339
>got 0.11256095725867177
Producer: Done
>got 0.9095338767572713
Consumer: Done

Далее рассмотрим, как можно получить значения из очереди без блокировки.

Пример работы Asyncio Queue без блокировки

Мы можем получать значения из очереди asyncio.Queue без блокировки. Это может быть полезно, если мы хотим использовать занятое ожидание в корутине-потребителе для проверки другого состояния или выполнения других задач в ожидании поступления данных в очередь. Мы можем обновить пример из предыдущего раздела, чтобы получать элементы из очереди без блокировки. Этого можно добиться, вызвав метод get_nowait(). Функция get_nowait() возвращается немедленно.

Если в очереди есть значение, которое необходимо получить, то оно возвращается. В противном случае, если очередь пуста, возникает исключение asyncio.QueueEmpty, которое может быть обработано. В данном случае, если нет значения, которое можно получить из очереди, мы сообщаем об этом и засыпаем на долю секунды. Затем мы продолжим работу, что приведет к возврату к началу цикла ожидания занятости потребителя.

# пример использования очереди asyncio без блокировки
from random import random
import asyncio
 
# coroutine to generate work
async def producer(queue):
    print('Producer: Running')
    # generate work
    for i in range(10):
        # генерация значения
        value = random()
        # блокировка для симуляции работы
        await asyncio.sleep(value)
        # добавление в очередь
        await queue.put(value)
    # отправление сигнал о готовности
    await queue.put(None)
    print('Producer: Done')
 
# корутина для выполнения работы
async def consumer(queue):
    print('Consumer: Running')
    # consume work
    while True:
        # получение единицу работы без блокировки
        try:
            item = queue.get_nowait()
        except asyncio.QueueEmpty:
            print('Consumer: got nothing, waiting a while...')
            await asyncio.sleep(0.5)
            continue
        # check for stop
        if item is None:
            break
        # report
        print(f'>got {item}')
    # all done
    print('Consumer: Done')
 
# точка входа корутины
async def main():
    # создание общей очереди
    queue = asyncio.Queue()
    # запуск проивзводителя и потребителя
    await asyncio.gather(producer(queue), consumer(queue))
 
# запуск программы
asyncio.run(main())

Запуск примера создает общую очередь.Queue, затем, как и прежде, запускает корутины consumer и producer. Корутина-производитель генерирует, блокирует и добавляет элементы в очередь. Корутина-потребитель пытается получить значение из очереди. Если значение не получено, то возникает исключение asyncio.QueueEmpty, которое обрабатывается путем выдачи сообщения, "сна" на доли секунды и повторного запуска цикла ожидания.

В противном случае, если в очереди есть значение, потребитель получит его и сообщит об этом в обычном режиме. Мы видим, что сообщения от корутины-потребителя заняты ожиданием поступления новых данных в очередь. Это показывает, как можно получать элементы из asyncio.Queue без блокировки. Обратите внимание, что вывод программы будет отличаться при каждом запуске, поскольку в ней используются случайные числа.

Вывод программы без блокировки
Producer: Running
Consumer: Running
Consumer: got nothing, waiting a while...
Consumer: got nothing, waiting a while...
>got 0.896558357626797
Consumer: got nothing, waiting a while...
Consumer: got nothing, waiting a while...
>got 0.6498874449486562
>got 0.14862534743361389
Consumer: got nothing, waiting a while...
Consumer: got nothing, waiting a while...
>got 0.9271724543351715
Consumer: got nothing, waiting a while...
>got 0.6659822945662333
>got 0.11205862071348183
Consumer: got nothing, waiting a while...
Consumer: got nothing, waiting a while...
>got 0.9490125408623084
Consumer: got nothing, waiting a while...
>got 0.150509682492045
>got 0.23281901173320807
Consumer: got nothing, waiting a while...
Consumer: got nothing, waiting a while...
Producer: Done
>got 0.8999468879239988
Consumer: Done

Далее рассмотрим, как можно получить значения из очереди с таймаутом.

Пример очереди Asyncio с таймаутом

Мы можем получать значения из очереди asyncio.Queue с помощью блокировки, но с ограничением по таймауту. Это позволяет потребительской корутине не только блокировать ожидание поступления значений в очередь, но и выполнять другие задачи во время ожидания. Это может быть более эффективно, чем занятое ожидание без каких-либо блокирующих вызовов.

Мы можем обновить приведенный выше пример, чтобы использовать таймаут при получении элементов из очереди в корутине-потребителе. Этого можно добиться, вызывая корутину get() как обычно.

В отличие от queue.Queue, asyncio.Queue не поддерживает тайм-аут напрямую. Вместо этого мы можем обернуть корутину get() в корутину wait_for(), которая поддерживает таймаут. Если таймаут истекает до завершения работы корутины get(), то возникает исключение asyncio.TimeoutError, которое может быть обработано. Например:

# пример использования очереди asyncio с таймаутом
from random import random
import asyncio
 
# coroutine to generate work
async def producer(queue):
    print('Producer: Running')
    # generate work
    for i in range(10):
        # generate a value
        value = random()
        # block to simulate work
        await asyncio.sleep(value)
        # add to the queue
        await queue.put(value)
    # send an all done signal
    await queue.put(None)
    print('Producer: Done')
 
# consume work
async def consumer(queue):
    print('Consumer: Running')
    # consume work
    while True:
        # get a unit of work
        try:
            # retrieve the get() awaitable
            get_await = queue.get()
            # await the awaitable with a timeout
            item = await asyncio.wait_for(get_await, 0.5)
        except asyncio.TimeoutError:
            print('Consumer: gave up waiting...')
            continue
        # check for stop
        if item is None:
            break
        # report
        print(f'>got {item}')
    # all done
    print('Consumer: Done')
 
# entry point coroutine
async def main():
    # create the shared queue
    queue = asyncio.Queue()
    # run the producer and consumers
    await asyncio.gather(producer(queue), consumer(queue))
 
# start the asyncio program
asyncio.run(main())

Запуск примера создает общую очередь asyncio.Queue, затем, как и прежде, запускает корутины consumer и producer. Корутина-производитель будет генерировать, блокировать и добавлять элементы в очередь. Корутин-потребитель попытается получить значение из очереди. Она блокируется на время таймаута. Если до истечения тайм-аута значение не будет получено, то возникнет исключение asyncio.TimeoutError, которое будет обработано путем выдачи сообщения и повторного запуска цикла ожидания. В противном случае, если в очереди есть значение, потребитель получит его и сообщит об этом в обычном режиме.

Мы видим, что сообщения от корутины-потребителя заняты ожиданием поступления новых данных в очередь. Здесь показано, как получать элементы из asyncio.Queue с помощью блокировки по таймауту. Обратите внимание, что вывод программы будет отличаться при каждом запуске, поскольку в ней используются случайные числа.

Вывод программы с таймаутом
Producer: Running
Consumer: Running
Consumer: gave up waiting...
>got 0.8506665865206575
Consumer: gave up waiting...
>got 0.851355213428328
>got 0.3050736798012632
Consumer: gave up waiting...
>got 0.7019959682053681
Consumer: gave up waiting...
>got 0.9753069917130328
Consumer: gave up waiting...
>got 0.7813291071437218
Consumer: gave up waiting...
>got 0.7831885826899522
Consumer: gave up waiting...
>got 0.8001066750131507
Consumer: gave up waiting...
>got 0.9564293628868409
Producer: Done
>got 0.41507431394001704
Consumer: Done

Далее рассмотрим, как можно ожидать в очереди и помечать задачи как завершенные в очереди.

Пример Asyncio Queue Join и Task Done

В предыдущих примерах мы посылали в очередь специальное сообщение (None), чтобы указать, что все задачи выполнены. Альтернативный подход заключается в том, чтобы заставить корутины ждать непосредственно в очереди, а корутину-потребителя помечать задачи как выполненные. Этого можно добиться с помощью функций join() и task_done() в asyncio.Queue. Обновим нашу программу.

Корутина-producer может быть обновлена, чтобы больше не посылать в очередь значение None, указывающее на отсутствие дальнейших задач. Корутина-consumer может быть обновлена, чтобы больше не проверять сообщения None и помечать каждую задачу как выполненную с помощью вызова task_done().

Корутина-producer будет работать до тех пор, пока в очередь не перестанут добавляться задачи, и завершится. Корутина-consumer теперь будет работать вечно. Сначала мы запустим корутину-потребителя как отдельную самостоятельную задачу. Затем мы выполним и будем ожидать выполнения корутины-производителя до тех пор, пока она не будет завершена и все элементы не будут добавлены в очередь.

Наконец, мы будем ждать, пока все добавленные в очередь элементы не будут помечены как выполненные. Давайте посмотрим на эту программу:

# пример использования join и task_done с очередью asyncio
from random import random
import asyncio
 
# coroutine to generate work
async def producer(queue):
    print('Producer: Running')
    # generate work
    for i in range(10):
        # generate a value
        value = random()
        # block to simulate work
        await asyncio.sleep(value)
        # add to the queue
        await queue.put(value)
    print('Producer: Done')
 
# coroutine to consume work
async def consumer(queue):
    print('Consumer: Running')
    # consume work
    while True:
        # get a unit of work
        item = await queue.get()
        # report
        print(f'>got {item}')
        # block while processing
        if item:
            await asyncio.sleep(item)
        # mark the task as done
        queue.task_done()
 
# entry point coroutine
async def main():
    # create the shared queue
    queue = asyncio.Queue()
    # start the consumer
    _ = asyncio.create_task(consumer(queue))
    # start the producer and wait for it to finish
    await asyncio.create_task(producer(queue))
    # wait for all items to be processed
    await queue.join()
 
# start the asyncio program
asyncio.run(main())

Запуск этого программы создает общую очередь asyncio.Queue, затем, как и прежде, запускает корутины consumer и producer. Корутина-производитель генерирует, блокирует и добавляет элементы в очередь. Корутина-потребитель пытается получить значения из очереди. После того как корутина-производитель добавит все свои десять элементов, она завершится. Корутина-потребитель будет работать вечно в фоновом режиме. После завершения работы корутины-производителя главная корутина main() блокирует очередь. После того как корутина-потребитель обработает все элементы, добавленные в очередь, корутина main() возобновит работу и завершится. После этого программа завершается, а корутина-потребитель продолжает работать в фоновом режиме, но уже без обработки.

Здесь было показано, как отмечать задания в очереди как выполненные и как ждать в очереди завершения всех работ. Обратите внимание на то, что вывод программы будет отличаться при каждом запуске, поскольку в ней используются случайные числа.

Вывод программы с функциями join() и task_done()
Consumer: Running
Producer: Running
>got 0.98439852757525
>got 0.31319007221013795
>got 0.9398085059848861
>got 0.14351842921376057
>got 0.24629462902135835
>got 0.4488704344186214
>got 0.19476785739518376
>got 0.8393990524378161
>got 0.3269099694795079
Producer: Done
>got 0.8274430954459486

Далее рассмотрим, как можно работать с очередью ограниченного размера.

Пример очереди Asyncio с ограниченным размером

Мы можем ограничить емкость очереди. Это может быть полезно при наличии большого числа производителей или медленных потребителей. Это позволяет ограничить количество задач, которые могут находиться в памяти в каждый момент времени, ограничивая общее использование памяти приложением. Этого можно добиться, задав аргумент "maxsize" в конструкторе очереди при ее настройке. Поскольку это первый и единственный позиционный аргумент, мы можем не указывать его имя в конструкторе.

Когда очередь заполнена, вызов функции put() будет блокироваться до тех пор, пока не освободится место для размещения очередного элемента в очереди. Мы можем продемонстрировать это, обновив пример из предыдущего раздела, чтобы ограничить размер очереди небольшим числом, например 2 элемента, и чтобы скромное число производителей, например 5, пыталось заполнить очередь. Это позволит держать очередь заполненной большую часть времени, вынуждая производителей блокировать ее при добавлении элементов.

Во-первых, мы можем задать емкость очереди при ее построении. Далее мы можем создать пять корутин-производителей, которые будут одновременно добавлять в очередь задачи с попытками выполнения. Это можно сделать в виде списка, который затем можно распаковать на отдельные выражения в вызове asyncio.gather().

После того как все производители завершат свою работу, корутина main() может присоединиться к очереди и ждать, пока потребитель отметит все элементы как выполненные.

# пример использования очереди asyncio с ограниченной емкостью
from random import random
import asyncio
 
# coroutine to generate work
async def producer(queue):
    print('Producer: Running')
    # generate work
    for i in range(10):
        # generate a value
        value = random()
        # block to simulate work
        await asyncio.sleep(value)
        # add to the queue, may block
        await queue.put(value)
    print('Producer: Done')
 
# coroutine to consume work
async def consumer(queue):
    print('Consumer: Running')
    # consume work
    while True:
        # get a unit of work
        item = await queue.get()
        # report
        print(f'>got {item}')
        # block while processing
        if item:
            await asyncio.sleep(item)
        # mark as completed
        queue.task_done()
    # all done
    print('Consumer: Done')
 
# entry point coroutine
async def main():
    # create the shared queue
    queue = asyncio.Queue(2)
    # start the consumer
    _ = asyncio.create_task(consumer(queue))
    # create many producers
    producers = [producer(queue) for _ in range(5)]
    # run and wait for the producers to finish
    await asyncio.gather(*producers)
    # wait for the consumer to process all items
    await queue.join()
 
# start the asyncio program
asyncio.run(main())

При запуске этого примера сначала запускается корутина-потребитель, затем все пять корутин-производителей. Затем корутина main() блокируется до завершения работы корутин-производителей. Затем каждая из пяти корутин-производителей пытается добавить в очередь десять элементов настолько быстро, насколько это возможно. Емкость очереди настолько ограничена, что эти программы большую часть времени будут блокировать вызов функции put(), пока не освободится место для добавления нового элемента в очередь. Корутина-потребитель будет потреблять элементы из очереди так быстро, как только сможет, сообщать их значения и помечать их как выполненные. Она будет работать вечно в качестве фоновой задачи.

После завершения работы коуртин-произвиотелей корутина main() возобновляет работу и блокирует очередь, ожидая, пока все оставшиеся задачи будут потреблены и помечены как выполненные корутиной-потребителем.

В этом примере было показано, как использовать ограниченную емкость asyncio.Queue и как сочетать это с пометкой задач как выполненных и присоединением к очереди. Обратите внимание, что вывод программы будет отличаться при каждом запуске, поскольку в ней используются случайные числа.

Вывод программы с ограниченным размером очереди
Consumer: Running
Producer: Running
Producer: Running
Producer: Running
Producer: Running
Producer: Running
>got 0.0798149651109541
>got 0.5513864113584395
>got 0.8149184098780632
>got 0.8561030038666221
>got 0.8225047439580798
>got 0.992630421268497
>got 0.27449486943860757
>got 0.10489939965437134
>got 0.9004478449122744
>got 0.9442262069705694
>got 0.9517905758143422
>got 0.38578513687892313
>got 0.21314357809327322
>got 0.006412317984848315
>got 0.522391949578982
>got 0.4289851852631642
>got 0.5237185610606917
>got 0.7128146789112292
>got 0.2424277811353306
>got 0.44543328087703804
>got 0.36961101864563994
>got 0.46362053301168127
>got 0.853341848695711
>got 0.5234863755930941
>got 0.04593820030932505
>got 0.0554357759717663
>got 0.008185842872241
>got 0.9700101228192052
>got 0.8048086100285801
>got 0.689831779214825
>got 0.3245915440087028
>got 0.21373695813973959
>got 0.9315929425005609
>got 0.9382045140049264
>got 0.925811547635268
>got 0.6079025826247971
>got 0.1675603246130124
>got 0.8861271320774468
>got 0.5610211824876841
>got 0.6335242295962565
Producer: Done
>got 0.5251525663901687
>got 0.8263850076196841
>got 0.06117578863178552
>got 0.7066342593552792
Producer: Done
>got 0.883204743564828
Producer: Done
>got 0.06293969547023037
Producer: Done
>got 0.5876241223957309
>got 0.7631673862150006
Producer: Done
>got 0.07354652534254391
>got 0.25988256916156316

Рекомендации автора статьи

Комментарии (0)