В предыдущей статье вы могли узнать что такое очередь в целом и как работает FIFO-очередь asyncio.Queue. Давайте продолжим и посмотрим на примере библиотеки aiohttp как работают очереди с приоритетом asyncio.PriorityQueue.

readme

Автор предыдущей статьи про asyncio.Queue довольно подробно описал тему про FIFO очередь, но дальше, про остальные типы очередей, пишет довольно шаблонно, примеры в статье мне тоже не понравились и я решил продолжить тему частичным переводом другой статьи.

Если вы нашли ошибку, пожалуйста, используйте Ctrl+Enter и я исправлю. Спасибо!

Паттерн producer-consumer

Повторим в чем смысл паттерна producer-consumer(производитель-потребитель). Представьте себе два типа задач, разделяющих очередь. Задача A производит данные и помещает их в очередь, а задача B извлекает данные из очереди для обработки. Это и есть модель "производитель-потребитель", где задача A - производитель, а задача B - потребитель. По аналогии с супермаркетом, покупатели являются производителями, кассиры - потребителями, а очередь покупателей представляет собой очередь.

Зачем использовать паттерн производитель-потребитель

В высококонкурентных программах производители часто генерируют данные быстро, а потребители обрабатывают их медленно. Таким образом, производители должны дождаться окончания обработки данных потребителями, прежде чем продолжать генерировать данные.

Иногда потребители обрабатывают данные быстро, а производители — медленно. Это приводит к тому, что потребители ждут, пока производители сгенерируют данные, прежде чем продолжить работу. Для баланса между производителями и потребителями необходима очередь, в которой хранятся данные, произведенные производителем. Очередь выполняет роль буфера и разделяет производителей и потребителей.

Image by Peng Qian
Image by Peng Qian

Очередь с приоритетом asyncio.PriorityQueue

В этой статье мы обсудили как работает очередь типа FIFO в asyncio. Теперь давайте посмотрим как работают очереди с приоритетом asyncio.PriorityQueue.

Определение из wikipedia:

Очередь с приоритетом (англ. priority queue) — абстрактный тип данных в программировании, поддерживающий две обязательные операции — добавить элемент и извлечь максимум[1] (минимум). Предполагается, что для каждого элемента можно вычислить его приоритет — действительное число или в общем случае элемент линейно упорядоченного множества.

Зачем использовать asyncio.PriorityQueue?

Предположим, имеется очередь, в которой стоят задачи, каждая из которых требует длительного времени обработки. Журнал ошибок или VIP-доступ пользователя - это высокоприоритетная задача, требующая немедленного внимания. Что же делать? Именно здесь на помощь приходит asyncio.PriorityQueue.

Кратко опишем реализацию asyncio.PriorityQueue

В отличие от очередей FIFO, основанных на списках, asyncio.PriorityQueue основана на кучах. Она построена с использованием структуры бинарного дерева.

Вы возможно знакомы с двоичными деревьями поиска, которые гарантируют, что самый младший узел всегда является крайним левым узлом. Однако двоичное дерево в asyncio.PriorityQueue гарантирует, что самый младший узел всегда находится наверху, поэтому узел с наивысшим приоритетом постоянно удаляется первым.

Слева - бинарное дерево в PriorityQueue, справа - дерево бинарного поиска. Image by Peng Qian.
Слева - бинарное дерево в PriorityQueue, справа - дерево бинарного поиска. Image by Peng Qian.

Методы для очередей с приоритетом asyncio.PriorityQueue такие же, как для asyncio.Queue. Подробнее с методами вы можете ознакомиться в этой статье

Методы класса asyncio.PriorityQueue
  • await queue.put(item) - для помещения элемента в очередь. Если очередь переполнена, этот метод будет ждать, пока не освободится свободный слот.

  • await queue.get() - для получения элемента из очереди. Если очередь пуста, этот метод будет ждать, пока элемент не станет доступен.

  • queue.task_done() - для индикации того, что ранее полученный элемент был обработан. Этот метод должен вызываться потребительскими корутинами после завершения работы с элементом.

  • await queue.join() - для блокировки обработки всех элементов в очереди. Этот метод должен вызываться корутинами-производителями после того, как они завершат занесение элементов в очередь.

    Можно также использовать некоторые "некорутинные" методы очереди, например:

  • queue.put_nowait(item) - чтобы поместить элемент в очередь без блокировки. Если очередь переполнена, этот метод вызовет исключение QueueFull.

  • queue.get_nowait() - для получения элемента из очереди без блокировки. Если очередь пуста, то этот метод вызовет исключение QueueEmpty.

  • queue.qsize() - для получения количества элементов в очереди.

  • queue.empty() - для проверки, пуста ли очередь.

  • queue.full() - для проверки заполнения очереди.

Пример использования asyncio.PriorityQueue в реальном мире

Проиллюстрируем использование asyncio.PriorityQueue на примере реального сценария. Представьте, что у нас есть API сервиса заказов. API требует времени на обработку каждого заказа, но мы не можем заставлять пользователей ждать слишком долго. Поэтому, когда пользователь размещает заказ, API сначала помещает его в очередь, позволяя фоновой задаче обработать его асинхронно и сразу же вернуть сообщение пользователю.

Данный API принимает заказы от двух типов пользователей: обычных и VIP. При этом необходимо следить за тем, чтобы заказы VIP-пользователей обрабатывались с наивысшим приоритетом.

VIP-заказы обрабатываются с наивысшим приоритетом. Image by Peng Qian.
VIP-заказы обрабатываются с наивысшим приоритетом. Image by Peng Qian.

Давай рассмотрим реализацию программы с приоритетной очередью на примере aiohttp:

import asyncio
from asyncio import PriorityQueue, Task
from dataclasses import dataclass, field
from enum import IntEnum
from random import randrange

from aiohttp import web
from aiohttp.web_app import Application
from aiohttp.web_request import Request
from aiohttp.web_response import Response

app = Application()
routers = web.RouteTableDef()
QUEUE_KEY = "QUEUE_KEY"
TASK_KEY = "TASK_KEY"


class UserType(IntEnum):
    POWER_USER = 1
    NORMAL_USER = 2


@dataclass(order=True)
class WorkItem:
    user_type: UserType
    order_delay: int = field(compare=False)

Сначала мы определяем Enum(перечисление), обозначающее две категории: обычные пользователи и VIP-пользователи.

Затем с помощью dataclass определяем заказ пользователя, который содержит тип пользователя и продолжительность обработки заказа. Продолжительность обработки заказа не учитывается при сортировке приоритетов.

Дальше мы определяем метод-потребитель process_order_worker, который получает заказы из очереди и имитирует их обработку. Не забудьте использовать queue.task_done(), чтобы сообщить очереди, что мы закончили обработку заказа.

async def process_order_worker(worker_id: int, queue: PriorityQueue):
    while True:
        work_item: WorkItem = await queue.get()
        print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}")
        await asyncio.sleep(work_item.order_delay)
        print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}")
        queue.task_done()

После этого мы реализуем API заказа, используя aiohttp. Этот API отвечает на запросы пользователей, генерирует объект заказа и помещает его в asyncio.PriorityQueue. Затем он немедленно возвращает ответ пользователю, что позволяет избежать времени ожидания.

@routers.post("/order")
async def order(request: Request) -> Response:
    queue: PriorityQueue = app[QUEUE_KEY]
    body = await request.json()
    user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER
    work_item = WorkItem(user_type, randrange(5))
    await queue.put(work_item)

    return Response(body="order placed!")

При запуске программы мы используем команду create_order_queue для инициализации очереди и упорядочивания задач потребления.

async def create_order_queue(app: Application):
    print("create_order_queue: Begin to initialize queue and tasks.")
    queue: PriorityQueue = PriorityQueue(10)
    tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)]
    app[QUEUE_KEY] = queue
    app[TASK_KEY] = tasks
    print("create_order_queue: Initialize queue and tasks success..")

При завершении работы программы мы используем команду destroy_order_queue, чтобы убедиться, что все заказы в очереди обработаны и фоновые задачи корректно закрыты.

Функция queue.join() будет ожидать обработки всех данных в очереди. asyncio.wait_for устанавливает таймаут в 20 секунд, по истечении которого она больше не будет ожидать завершения работы queue.join().

async def destroy_order_queue(app: Application):
    queue: PriorityQueue = app[QUEUE_KEY]
    tasks: list[Task] = app[TASK_KEY]

    try:
        print("destroy_order_queue: Wait for 20 sec to let all work done.")
        await asyncio.wait_for(queue.join(), timeout=20.0)
    except Exception as e:
        print("destroy_order_queue: Cancel all tasks.")
        [task.cancel() for task in tasks]


app.add_routes(routers)
app.on_startup.append(create_order_queue)
app.on_shutdown.append(destroy_order_queue)
web.run_app(app)
Полный листинг кода программы
import asyncio
from asyncio import PriorityQueue, Task
from dataclasses import dataclass, field
from enum import IntEnum
from random import randrange

from aiohttp import web
from aiohttp.web_app import Application
from aiohttp.web_request import Request
from aiohttp.web_response import Response

app = Application()
routers = web.RouteTableDef()
QUEUE_KEY = "QUEUE_KEY"
TASK_KEY = "TASK_KEY"


class UserType(IntEnum):
    POWER_USER = 1
    NORMAL_USER = 2


@dataclass(order=True)
class WorkItem:
    user_type: UserType
    order_delay: int = field(compare=False)


async def process_order_worker(worker_id: int, queue: PriorityQueue):
    while True:
        work_item: WorkItem = await queue.get()
        print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}")
        await asyncio.sleep(work_item.order_delay)
        print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}")
        queue.task_done()


@routers.post("/order")
async def order(request: Request) -> Response:
    queue: PriorityQueue = app[QUEUE_KEY]
    body = await request.json()
    user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER
    work_item = WorkItem(user_type, randrange(5))
    await queue.put(work_item)

    return Response(body="order placed!")


async def create_order_queue(app: Application):
    print("create_order_queue: Begin to initialize queue and tasks.")
    queue: PriorityQueue = PriorityQueue(10)
    tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)]
    app[QUEUE_KEY] = queue
    app[TASK_KEY] = tasks
    print("create_order_queue: Initialize queue and tasks success..")


async def destroy_order_queue(app: Application):
    queue: PriorityQueue = app[QUEUE_KEY]
    tasks: list[Task] = app[TASK_KEY]

    try:
        print("destroy_order_queue: Wait for 20 sec to let all work done.")
        await asyncio.wait_for(queue.join(), timeout=20.0)
    except Exception as e:
        print("destroy_order_queue: Cancel all tasks.")
        [task.cancel() for task in tasks]


app.add_routes(routers)
app.on_startup.append(create_order_queue)
app.on_shutdown.append(destroy_order_queue)
web.run_app(app)

Мы можем протестировать эту реализацию, используя HTTP-запрос PyCharm (только в Pro версии):

HTTP-запросы
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "True"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "False"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "False"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "True"}

Или, например, так, если у вас community версия:

import requests
from random import randint

url = 'http://localhost:8080/order'
myobj = [{"power_user": "True"}, {"power_user": "False"}]

for i in range(5):
    requests.post(url, json=myobj[randint(0,1)])
Результат работы программы
Результат работы программы

Как видите, две высокоприоритетные задачи обрабатываются, как и ожидалось. Отлично!

Выводы

В этой статье мы повторили для чего нужен паттерн производитель-потребитель:

  • Балансировка между производителями и потребителями, максимизация использования ресурсов.

  • Развязывание системы, позволяющее производителям и потребителям масштабироваться независимо друг от друга.

Также, на реальном примере увидели, как использовать asyncio.PriorityQueue для решения ситуаций, когда задачам требуется расстановка приоритетов.

Асинхронное программирование на Python - мощный инструмент, а паттерн производитель-потребитель с asyncio.Queue - это универсальный подход к обработке параллелизма и расстановке приоритетов в ваших приложениях.

Контакты автора статьи

Пожалуйста, подпишитесь, если вы нашли статьи полезными, и получайте новые истории на свой почтовый ящик. Если у вас есть вопросы, вы можете найти меня на LinkedIn или в Twitter(X).

Комментарии (0)