В предыдущей статье вы могли узнать что такое очередь в целом и как работает FIFO-очередь asyncio.Queue. Давайте продолжим и посмотрим на примере библиотеки aiohttp как работают очереди с приоритетом asyncio.PriorityQueue.
readme
Автор предыдущей статьи про asyncio.Queue довольно подробно описал тему про FIFO очередь, но дальше, про остальные типы очередей, пишет довольно шаблонно, примеры в статье мне тоже не понравились и я решил продолжить тему частичным переводом другой статьи.
Если вы нашли ошибку, пожалуйста, используйте Ctrl+Enter и я исправлю. Спасибо!
Паттерн producer-consumer
Повторим в чем смысл паттерна producer-consumer(производитель-потребитель). Представьте себе два типа задач, разделяющих очередь. Задача A производит данные и помещает их в очередь, а задача B извлекает данные из очереди для обработки. Это и есть модель "производитель-потребитель", где задача A - производитель, а задача B - потребитель. По аналогии с супермаркетом, покупатели являются производителями, кассиры - потребителями, а очередь покупателей представляет собой очередь.
Зачем использовать паттерн производитель-потребитель
В высококонкурентных программах производители часто генерируют данные быстро, а потребители обрабатывают их медленно. Таким образом, производители должны дождаться окончания обработки данных потребителями, прежде чем продолжать генерировать данные.
Иногда потребители обрабатывают данные быстро, а производители — медленно. Это приводит к тому, что потребители ждут, пока производители сгенерируют данные, прежде чем продолжить работу. Для баланса между производителями и потребителями необходима очередь, в которой хранятся данные, произведенные производителем. Очередь выполняет роль буфера и разделяет производителей и потребителей.
Очередь с приоритетом asyncio.PriorityQueue
В этой статье мы обсудили как работает очередь типа FIFO в asyncio. Теперь давайте посмотрим как работают очереди с приоритетом asyncio.PriorityQueue.
Очередь с приоритетом (англ. priority queue) — абстрактный тип данных в программировании, поддерживающий две обязательные операции — добавить элемент и извлечь максимум[1] (минимум). Предполагается, что для каждого элемента можно вычислить его приоритет — действительное число или в общем случае элемент линейно упорядоченного множества.
Зачем использовать asyncio.PriorityQueue?
Предположим, имеется очередь, в которой стоят задачи, каждая из которых требует длительного времени обработки. Журнал ошибок или VIP-доступ пользователя - это высокоприоритетная задача, требующая немедленного внимания. Что же делать? Именно здесь на помощь приходит asyncio.PriorityQueue.
Кратко опишем реализацию asyncio.PriorityQueue
В отличие от очередей FIFO, основанных на списках, asyncio.PriorityQueue основана на кучах. Она построена с использованием структуры бинарного дерева.
Вы возможно знакомы с двоичными деревьями поиска, которые гарантируют, что самый младший узел всегда является крайним левым узлом. Однако двоичное дерево в asyncio.PriorityQueue гарантирует, что самый младший узел всегда находится наверху, поэтому узел с наивысшим приоритетом постоянно удаляется первым.
Методы для очередей с приоритетом asyncio.PriorityQueue такие же, как для asyncio.Queue. Подробнее с методами вы можете ознакомиться в этой статье
Методы класса asyncio.PriorityQueue
await queue.put(item) - для помещения элемента в очередь. Если очередь переполнена, этот метод будет ждать, пока не освободится свободный слот.
await queue.get() - для получения элемента из очереди. Если очередь пуста, этот метод будет ждать, пока элемент не станет доступен.
queue.task_done() - для индикации того, что ранее полученный элемент был обработан. Этот метод должен вызываться потребительскими корутинами после завершения работы с элементом.
-
await queue.join() - для блокировки обработки всех элементов в очереди. Этот метод должен вызываться корутинами-производителями после того, как они завершат занесение элементов в очередь.
Можно также использовать некоторые "некорутинные" методы очереди, например:
queue.put_nowait(item) - чтобы поместить элемент в очередь без блокировки. Если очередь переполнена, этот метод вызовет исключение QueueFull.
queue.get_nowait() - для получения элемента из очереди без блокировки. Если очередь пуста, то этот метод вызовет исключение QueueEmpty.
queue.qsize() - для получения количества элементов в очереди.
queue.empty() - для проверки, пуста ли очередь.
queue.full() - для проверки заполнения очереди.
Пример использования asyncio.PriorityQueue в реальном мире
Проиллюстрируем использование asyncio.PriorityQueue на примере реального сценария. Представьте, что у нас есть API сервиса заказов. API требует времени на обработку каждого заказа, но мы не можем заставлять пользователей ждать слишком долго. Поэтому, когда пользователь размещает заказ, API сначала помещает его в очередь, позволяя фоновой задаче обработать его асинхронно и сразу же вернуть сообщение пользователю.
Данный API принимает заказы от двух типов пользователей: обычных и VIP. При этом необходимо следить за тем, чтобы заказы VIP-пользователей обрабатывались с наивысшим приоритетом.
Давай рассмотрим реализацию программы с приоритетной очередью на примере aiohttp:
import asyncio
from asyncio import PriorityQueue, Task
from dataclasses import dataclass, field
from enum import IntEnum
from random import randrange
from aiohttp import web
from aiohttp.web_app import Application
from aiohttp.web_request import Request
from aiohttp.web_response import Response
app = Application()
routers = web.RouteTableDef()
QUEUE_KEY = "QUEUE_KEY"
TASK_KEY = "TASK_KEY"
class UserType(IntEnum):
POWER_USER = 1
NORMAL_USER = 2
@dataclass(order=True)
class WorkItem:
user_type: UserType
order_delay: int = field(compare=False)
Сначала мы определяем Enum(перечисление), обозначающее две категории: обычные пользователи и VIP-пользователи.
Затем с помощью dataclass определяем заказ пользователя, который содержит тип пользователя и продолжительность обработки заказа. Продолжительность обработки заказа не учитывается при сортировке приоритетов.
Дальше мы определяем метод-потребитель process_order_worker, который получает заказы из очереди и имитирует их обработку. Не забудьте использовать queue.task_done(), чтобы сообщить очереди, что мы закончили обработку заказа.
async def process_order_worker(worker_id: int, queue: PriorityQueue):
while True:
work_item: WorkItem = await queue.get()
print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}")
await asyncio.sleep(work_item.order_delay)
print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}")
queue.task_done()
После этого мы реализуем API заказа, используя aiohttp. Этот API отвечает на запросы пользователей, генерирует объект заказа и помещает его в asyncio.PriorityQueue. Затем он немедленно возвращает ответ пользователю, что позволяет избежать времени ожидания.
@routers.post("/order")
async def order(request: Request) -> Response:
queue: PriorityQueue = app[QUEUE_KEY]
body = await request.json()
user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER
work_item = WorkItem(user_type, randrange(5))
await queue.put(work_item)
return Response(body="order placed!")
При запуске программы мы используем команду create_order_queue для инициализации очереди и упорядочивания задач потребления.
async def create_order_queue(app: Application):
print("create_order_queue: Begin to initialize queue and tasks.")
queue: PriorityQueue = PriorityQueue(10)
tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)]
app[QUEUE_KEY] = queue
app[TASK_KEY] = tasks
print("create_order_queue: Initialize queue and tasks success..")
При завершении работы программы мы используем команду destroy_order_queue, чтобы убедиться, что все заказы в очереди обработаны и фоновые задачи корректно закрыты.
Функция queue.join() будет ожидать обработки всех данных в очереди. asyncio.wait_for
устанавливает таймаут в 20 секунд, по истечении которого она больше не будет ожидать завершения работы queue.join().
async def destroy_order_queue(app: Application):
queue: PriorityQueue = app[QUEUE_KEY]
tasks: list[Task] = app[TASK_KEY]
try:
print("destroy_order_queue: Wait for 20 sec to let all work done.")
await asyncio.wait_for(queue.join(), timeout=20.0)
except Exception as e:
print("destroy_order_queue: Cancel all tasks.")
[task.cancel() for task in tasks]
app.add_routes(routers)
app.on_startup.append(create_order_queue)
app.on_shutdown.append(destroy_order_queue)
web.run_app(app)
Полный листинг кода программы
import asyncio
from asyncio import PriorityQueue, Task
from dataclasses import dataclass, field
from enum import IntEnum
from random import randrange
from aiohttp import web
from aiohttp.web_app import Application
from aiohttp.web_request import Request
from aiohttp.web_response import Response
app = Application()
routers = web.RouteTableDef()
QUEUE_KEY = "QUEUE_KEY"
TASK_KEY = "TASK_KEY"
class UserType(IntEnum):
POWER_USER = 1
NORMAL_USER = 2
@dataclass(order=True)
class WorkItem:
user_type: UserType
order_delay: int = field(compare=False)
async def process_order_worker(worker_id: int, queue: PriorityQueue):
while True:
work_item: WorkItem = await queue.get()
print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}")
await asyncio.sleep(work_item.order_delay)
print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}")
queue.task_done()
@routers.post("/order")
async def order(request: Request) -> Response:
queue: PriorityQueue = app[QUEUE_KEY]
body = await request.json()
user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER
work_item = WorkItem(user_type, randrange(5))
await queue.put(work_item)
return Response(body="order placed!")
async def create_order_queue(app: Application):
print("create_order_queue: Begin to initialize queue and tasks.")
queue: PriorityQueue = PriorityQueue(10)
tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)]
app[QUEUE_KEY] = queue
app[TASK_KEY] = tasks
print("create_order_queue: Initialize queue and tasks success..")
async def destroy_order_queue(app: Application):
queue: PriorityQueue = app[QUEUE_KEY]
tasks: list[Task] = app[TASK_KEY]
try:
print("destroy_order_queue: Wait for 20 sec to let all work done.")
await asyncio.wait_for(queue.join(), timeout=20.0)
except Exception as e:
print("destroy_order_queue: Cancel all tasks.")
[task.cancel() for task in tasks]
app.add_routes(routers)
app.on_startup.append(create_order_queue)
app.on_shutdown.append(destroy_order_queue)
web.run_app(app)
Мы можем протестировать эту реализацию, используя HTTP-запрос PyCharm (только в Pro версии):
HTTP-запросы
POST http://localhost:8080/order
Content-Type: application/json
{"power_user": "True"}
###
POST http://localhost:8080/order
Content-Type: application/json
{"power_user": "False"}
###
POST http://localhost:8080/order
Content-Type: application/json
{"power_user": "False"}
###
POST http://localhost:8080/order
Content-Type: application/json
{"power_user": "True"}
Или, например, так, если у вас community версия:
import requests
from random import randint
url = 'http://localhost:8080/order'
myobj = [{"power_user": "True"}, {"power_user": "False"}]
for i in range(5):
requests.post(url, json=myobj[randint(0,1)])
Как видите, две высокоприоритетные задачи обрабатываются, как и ожидалось. Отлично!
Выводы
В этой статье мы повторили для чего нужен паттерн производитель-потребитель:
Балансировка между производителями и потребителями, максимизация использования ресурсов.
Развязывание системы, позволяющее производителям и потребителям масштабироваться независимо друг от друга.
Также, на реальном примере увидели, как использовать asyncio.PriorityQueue для решения ситуаций, когда задачам требуется расстановка приоритетов.
Асинхронное программирование на Python - мощный инструмент, а паттерн производитель-потребитель с asyncio.Queue - это универсальный подход к обработке параллелизма и расстановке приоритетов в ваших приложениях.
Контакты автора статьи
Пожалуйста, подпишитесь, если вы нашли статьи полезными, и получайте новые истории на свой почтовый ящик. Если у вас есть вопросы, вы можете найти меня на LinkedIn или в Twitter(X).