При написании приложений с несколькими потоками или процессами нужно помнить о возможности состояния гонки при использовании неатомарных операций. Даже простая задача для увеличения целого числа на единицу в конкурентной программе может вызвать ошибки, с трудом поддающиеся воспроизведению. Но при использовании asyncio мы всегда работаем в одном потоке (если только явно не задействовали средства многопоточной или многопроцессной обработки), значит, можно не беспокоиться о гонках, правда? На самом деле все не так просто!

Некоторые ошибки, встречающиеся в многопоточных и многопроцессных приложениях, исключены в силу однопоточной природы asyncio, но это не совсем так. Вам редко потребуется прибегать к синхронизации при работе с asyncio, однако остаются ситуации, когда с этим нужно что‑то делать. Примитивы синхронизации asyncio могут помочь предотвратить ошибки, свойственные только модели однопоточной конкурентности.

В этой статье вы узнаете, зачем нужны примитивы синхронизации asyncio, а также о лучших практиках использования нескольких примитивов синхронизации. В конце статьи рассмотрим практический пример использования примитивов синхронизации в действии.

readme

Если у вас есть замечения или вы нашли ошибку, пожалуйста, используйте Ctrl+Enter и я исправлю. Спасибо!

Зачем нужны примитивы синхронизации в asyncio?

Любой, кто использовал многопоточность в Python, знает, что несколько потоков используют один и тот же блок памяти. Поэтому, когда несколько потоков одновременно выполняют неатомарные операции над одной и той же областью, возникает проблема потокобезопасности. Поскольку asyncio работает в один поток, не возникает ли у него аналогичных проблем с потокобезопасностью? Ответ: нет.

Параллельные задачи в asyncio выполняются асинхронно, то есть возможно попеременное выполнение нескольких задач во времени. Ошибка параллелизма возникает, когда одна задача обращается к определенной области памяти и ожидает возврата операции ввода-вывода, а другая задача одновременно обращается к этой памяти.

Чтобы избежать подобных ошибок, в Python asyncio введен примитив синхронизации, аналогичный многопоточности. Это asyncio.Lock, asyncio.Semaphore, asyncio.Event и asyncio.Condition. Кроме того, чтобы избежать одновременного обращения к ресурсу слишком большого числа задач, примитивы синхронизации asyncio позволяют защитить ресурс, ограничив число одновременно обращающихся к нему задач.

Примитивы синхронизации в asyncio

asyncio.Lock

Рассмотрим ситуацию. Предположим, у нас есть параллельная задача, которой нужна копия данных сайта. Сначала она проверит, есть ли она в кэше, если есть, то получит ее из кэша, а если нет, то прочитает ее с сайта. Поскольку чтение данных сайта занимает некоторое время для возврата и обновления кэша, при одновременном выполнении нескольких параллельных задач все они предполагают, что этих данных в кэше нет, и запускают удаленные запросы одновременно, как показано в следующем коде:

import asyncio
import aiohttp

cache = dict()


async def request_remote():
    print("Will request the website to get status.")
    async with aiohttp.ClientSession() as session:
        response = await session.get("https://www.example.com")
        return response.status


async def get_value(key: str):
    if key not in cache:
        print(f"The value of key {key} is not in cache.")
        value = await request_remote()
        cache[key] = value
    else:
        print(f"The value of key {key} is already in cache.")
        value = cache[key]
    print(f"The value of {key} is {value}")
    return value


async def main():
    task_one = asyncio.create_task(get_value("status"))
    task_two = asyncio.create_task(get_value("status"))

    await asyncio.gather(task_one, task_two)


if __name__ == "__main__":
    asyncio.run(main())
Обе задачи считают, что в кэше нет данных, и таким образом обращаются к удаленному сайту. Изображение автора.
Обе задачи считают, что в кэше нет данных, и таким образом обращаются к удаленному сайту. Изображение автора.

Это не соответствует нашему первоначальному замыслу, поэтому на помощь приходит asyncio.Lock. Мы можем проверить наличие данных в кэше, когда параллельные задачи должны получить блокировку первыми, а другие задачи, не получившие блокировку, будут ждать.

Пока задача, получившая блокировку, не завершит обновление кэша и не снимет блокировку, остальные задачи могут продолжать выполнение.

Вся блок-схема показана ниже:

Вся блок-схема asyncio.Lock. Изображение автора.
Вся блок-схема asyncio.Lock. Изображение автора.

Давайте поправим наш код:

import asyncio
from asyncio import Lock

import aiohttp

cache = dict()
lock = Lock()


async def request_remote():
    print("Will request the website to get status.")
    async with aiohttp.ClientSession() as session:
        response = await session.get("https://www.example.com")
        return response.status


async def get_value(key: str):
    async with lock:
        if key not in cache:
            print(f"The value of key {key} is not in cache.")
            value = await request_remote()
            cache[key] = value
        else:
            print(f"The value of key {key} is already in cache.")
            value = cache[key]
        print(f"The value of {key} is {value}")
        return value


async def main():
    task_one = asyncio.create_task(get_value("status"))
    task_two = asyncio.create_task(get_value("status"))

    await asyncio.gather(task_one, task_two)


if __name__ == "__main__":
    asyncio.run(main())
Обновлять кэш нужно только в первой задаче. Изображение автора.
Обновлять кэш нужно только в первой задаче. Изображение автора.

asyncio.Semaphore

Иногда нам необходимо получить доступ к ресурсу с ограниченным количеством одновременных запросов. Например, конкретная база данных допускает одновременное открытие только пяти соединений. Или, в зависимости от типа подписки, веб-интерфейс поддерживает только определенное количество одновременных запросов. В этом случае необходимо использовать asyncio.Semaphore. asyncio.Semaphore использует внутренний счетчик, который уменьшается на единицу при каждом получении блокировки Semaphore, пока не достигнет нуля.

Семафор будет ограничивать число одновременно выполняемых задач. Изображение автора.
Семафор будет ограничивать число одновременно выполняемых задач. Изображение автора.

Когда счетчик asyncio.Semaphore равен нулю, другие задачи, которым нужна блокировка, будут ждать. При вызове метода release после выполнения других задач счетчик будет увеличен на единицу. Ожидающие задачи могут продолжить выполнение.

Пример кода выглядит следующим образом:

import asyncio
from asyncio import Semaphore
from aiohttp import ClientSession


async def get_url(url: str, session: ClientSession, semaphore: Semaphore):
    print('Waiting to acquire semaphore...')
    async with semaphore:
        print('Semaphore acquired, requesting...')
        response = await session.get(url)
        print('Finishing requesting')
        return response.status


async def main():
    # Хотя мы запускаем 1000 задач, одновременно будут выполняться только 10 задач.
    semaphore: Semaphore = Semaphore(10)
    async with ClientSession() as session:
        tasks = [asyncio.create_task(get_url("https://www.example.com", session, semaphore))
                 for _ in range(1000)]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

Таким образом, мы можем ограничить количество соединений, к которым можно получить одновременный доступ.

asyncio.BoundedSemaphore

Одна из особенностей семафоров заключается в том, что число вызовов метода release может превышать число вызовов acquire. Если мы всегда используем семафоры в сочетании с блоком async with, то такое невозможно, потому что с каждым acquire автоматически
связывается release. Но если нам требуется более точный контроль над механизмом захвата и освобождения, то возможны проблемы.

Что произойдет, если мы случайно вызовем метод release() несколько раз?

import asyncio
from asyncio import Semaphore


async def acquire(semaphore: Semaphore):
    print("acquire: Ожидание возможности захвата")
    async with semaphore:
        print("acquire: Захвачен...")
        await asyncio.sleep(5)
    print("acquire: Освобождается...")


async def release(semaphore: Semaphore):
    print("release: Одиночное освобождение...")
    semaphore.release()
    print("release: Одиночное освобождение - готово!")


async def main():
    semaphore = Semaphore(2)
    print("Два захвата, три освобождения...")
    await asyncio.gather(asyncio.create_task(acquire(semaphore)),
                         asyncio.create_task(acquire(semaphore)),
                         asyncio.create_task(release(semaphore)))
    print("Три захвата...")
    await asyncio.gather(asyncio.create_task(acquire(semaphore)),
                         asyncio.create_task(acquire(semaphore)),
                         asyncio.create_task(acquire(semaphore)))


if __name__ == "__main__":
    asyncio.run(main())

Здесь мы создаем семафор с пределом 2. Затем дважды вызываем сопрограмму acquire и один раз release, т.е. всего семафор будет освобождён трижды. Первое обращение к gather завершается, по видимости, нормально:

Однако при втором обращении, когда мы захватываем семафор три раза, возникают проблемы – все три захвата происходят сразу! Мы непреднамеренно превысили предел семафора:

Как видно из кода, мы ограничены одновременным выполнением двух задач, но поскольку мы вызвали release более одного раза, то в следующий раз мы сможем выполнить три задачи одновременно. Для решения этой проблемы мы можем использовать asyncio.BoundedSemaphore. Bounded Semaphore - это версия Semaphore, которая вызывает ошибку ValueError в функции release(), если внутренний счетчик увеличивается выше начального значения.

Как мы знаем из исходного кода, при вызове release выдается ошибка ValueError, если значение счетчика больше значения, установленного при инициализации:

import asyncio
from asyncio import BoundedSemaphore


async def main():
    semaphore = BoundedSemaphore(2)

    await semaphore.acquire()
    semaphore.release()
    semaphore.release()


if __name__ == "__main__":
    asyncio.run(main())
При многократном вызове метода release возникает ошибка ValueError. Изображение автора.
При многократном вызове метода release возникает ошибка ValueError. Изображение автора.

Здесь второй вызов release возбудит исключение ValueError, означающее, что мы освободили семафор слишком много раз. Аналогичный результат будет иметь место, если в предыдущем примере использовать BoundedSemaphore вместо Semaphore. Если вы вызываете acquire и release вручную, так что возникает опасность динамически превысить
предел семафора, то лучше работать с BoundedSemaphore, потому что возникшее исключение предупредит об ошибке.

asyncio.Event

Event поддерживает внутреннюю булеву переменную в качестве флага. asyncio.Event имеет три общих метода: wait, set и clear.

Когда задача добегает до event.wait(), она находится в состоянии ожидания. В этот момент можно вызвать event.set(), чтобы установить внутренний маркер в True, и все ожидающие задачи могут продолжить выполнение.

Когда задача завершится, необходимо вызвать метод event.clear(), чтобы сбросить значение маркера в False, вернуть событие в исходное состояние, и можно продолжать использовать событие в следующий раз.

В конце статьи вместо примера кода я покажу, как использовать Event для реализации шины событий.

asyncio.Condition

asyncio.Event хороши, когда нужно просто уведомить о том, что произошло нечто, но бывают ситуации посложнее. Допустим, что по событию требуется получить доступ к разделяемому ресурсу, т.е. захватить блокировку. Или что перед продолжением работы нужно дождаться более сложного сочетания условий, чем простое событие. Или что нужно разбудить не все задачи, а только определенное число. Во всех этих случаях могут выручить asyncio.Condition.

asyncio.Condition похож на asyncio.Lock и asyncio.Event вместе взятые. Сначала мы используем async with, чтобы обеспечить получение блокировки условия, а затем вызываем condition.wait(), чтобы освободить блокировку условия и заставить задачу временно подождать.

Когда condition.wait() проходит, мы возвращаем блокировку условия, чтобы гарантировать одновременное выполнение только одной задачи. Пока задача временно освобождает блокировку и переходит в состояние ожидания по методу condition.wait(), другая задача может либо асинхронизироваться с блокировкой условия и уведомить все ожидающие задачи о необходимости продолжить выполнение по методу condition.notify_all().

Блок-схема показана ниже:

Рабочий процесс asyncio.Condition. Изображение автора.
Рабочий процесс asyncio.Condition. Изображение автора.

Мы можем продемонстрировать действие asyncio.Condition на примере кода:

import asyncio
from asyncio import Condition


async def do_work(condition: Condition):
    print("do_work: захватываю блокировку условия...")
    async with condition:
        print("do_work: Блокировка захвачена, освобождаю и жду выполнения условия...")
        await condition.wait()
        print("do_work: Условие выполнено, вновь захватываю блокировку и начинаю работать...")
        await asyncio.sleep(1)
        print("do_work: Работа закончена, блокировка освобождена.")

        
async def fire_event(condition: Condition):
    await asyncio.sleep(5)
    print("fire_event: захватываю блокировку условия...")
    async with condition:
        print("fire_event: Блокировка захвачена, уведомляю всех исполнителей")
        condition.notify_all()
        print("fire_event: Исполнители уведомлены, освобождаю блокировку.")

        
async def main():
    condition = Condition()
    asyncio.create_task(fire_event(condition))

    await asyncio.gather(do_work(condition), do_work(condition))


if __name__ == "__main__":
    asyncio.run(main())

Иногда нам требуется, чтобы asyncio.Condition ждал наступления определенного события, прежде чем перейти к следующему шагу. Мы можем вызвать метод condition.wait_for() и передать в качестве аргумента метод. При каждом вызове condition.notify_all метод condition.wait_for проверяет результат выполнения метода-параметра и завершает ожидание, если он равен True, или продолжает ожидание, если он равен False.

condition.wait_for мы можем продемонстрировать на примере. В следующем коде мы смоделируем подключение к базе данных. Перед выполнением SQL-оператора код проверит, инициализировано ли соединение с базой данных, и выполнит запрос, если инициализация соединения завершена, или подождет, пока соединение не завершит инициализацию:

import asyncio
from asyncio import Condition
from enum import Enum


class ConnectionState(Enum):
    WAIT_INIT = 0
    INITIALING = 1
    INITIALIZED = 2


class Connection:
    def __init__(self):
        self._state = ConnectionState.WAIT_INIT
        self._condition = Condition()

    async def initialize(self):
        print("initialize: Preparing initialize the connection.")
        await self._change_state(ConnectionState.INITIALING)
        await asyncio.sleep(5)
        print("initialize: Connection initialized")
        await self._change_state(ConnectionState.INITIALIZED)

    async def execute(self, query: str):
        async with self._condition:
            print("execute: Waiting for connection initialized")
            await self._condition.wait_for(self._is_initialized)
            print(f"execute: Connection initialized, executing query: {query}")
            await asyncio.sleep(5)
            print("execute: Execute finished.")

    async def _change_state(self, state: ConnectionState):
        print(f"_change_state: Will change state from {self._state} to {state}")
        self._state = state
        print("_change_state: Change the state and notify all..")
        async with self._condition:
            self._condition.notify_all()

    def _is_initialized(self):
        if self._state is not ConnectionState.INITIALIZED:
            print("_is_initialized: The connection is not initialized.")
            return False
        print("_is_initialized: The connection is ready.")
        return True


async def main():
    connection = Connection()
    task_one = asyncio.create_task(connection.execute("SELECT * FROM table"))
    task_two = asyncio.create_task(connection.execute("SELECT * FROM other_table"))

    asyncio.create_task(connection.initialize())
    await asyncio.gather(task_one, task_two)


if __name__ == "__main__":
    asyncio.run(main())

asyncio.Barrier(добавлен в Python 3.11)

Объект Barrier - представляет собой объект. Непотокобезопасен. Это простой примитив синхронизации, позволяющий блокировать до тех пор, пока на нем не будет ожидать определенное количество задач. Задачи могут ожидать метод wait() и будут блокироваться до тех пор, пока заданное количество задач не закончит ожидание wait(). В этот момент все ожидающие задачи одновременно разблокируются.

Можно использовать async with как альтернативу ожиданию при вызове Barrier.wait(). Класс asyncio.Barrier можно использовать повторно любое количество раз.

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Да, это пример из докуметации, в статье этого не было. По этому классу пока не нашёл более подробных примеров. Буду добавлять информацию по мере возможности.

Некоторые советы по использованию примитивов синхронизации asyncio

Не забывайте использовать таймаут или отмену при необходимости

При использовании примитивов синхронизации мы, как правило, ожидаем завершения определенной операции ввода-вывода. Однако из-за флуктуаций в сети или по другим неизвестным причинам операция ввода-вывода в задаче может выполняться дольше, чем в других.В этом случае необходимо установить таймаут для операции, чтобы при слишком большом времени выполнения освободить блокировку и дать возможность другим задачам выполнить ее вовремя.

В другом случае мы можем перебирать задачи в цикле. Это может заставить некоторые задачи ждать в фоновом режиме и помешать правильному завершению программы. В этом случае не забудьте воспользоваться командой cancel, чтобы прервать циклическое выполнение задачи.

Избегайте использования примитивов синхронизации или блокировки только наименьшего количества ресурсов

Все мы знаем, что преимущество asyncio заключается в том, что задача может переключаться на выполнение другой задачи в ожидании возврата IO. Но задача asyncio часто содержит как операции, связанные с IO, так и операции, связанные с процессором. Если заблокировать на задаче слишком много кода, то она не сможет вовремя переключиться на другую задачу, что скажется на производительности. Поэтому, если в этом нет необходимости, старайтесь не использовать примитивы синхронизации или блокировать только наименьшее количество ресурсов.

Чтобы избежать ситуаций с конкурентной блокировкой

В asyncio нет RLock, поэтому не используйте блокировки в рекурсивном коде. Как и в случае с многопоточностью, в asyncio также существует вероятность возникновения тупиковых ситуаций, поэтому старайтесь избегать одновременного использования нескольких блокировок.

Пример: Шина событий на базе Asyncio

Надеюсь к этому моменту вы смогли разобраться и понять как работую примитивы синхронизации в Asyncio. Давайте рассмотрим более пракичный пример, рассмотрев реализацию шины событий. Как обычно, первым шагом является проектирование EventBus API.

import asyncio
from asyncio import Event
import inspect
from typing import Callable


class EventBus:
    def __init__(self):
        self._event_dict = dict()

    async def on(self, event_name: str, fn: Callable):
        PASS

    def trigger(self, event_name: str, *args, **kwargs):
        PASS

    def _get_event(self, event_name: str):
        PASS

Давайте реализуем эти методы. Поскольку EventBus взаимодействует с помощью строк, а внутри я намерен использовать asyncio.Event для реализации событий, соответствующих каждой строке, начнем с реализации метода _get_event:

def _get_event(self, event_name: str):
        if event_name in self._event_dict:
            print("event already inited...")
            event = self._event_dict.get(event_name)
        else:
            print(f"need to init a new event for {event_name}")
            event = Event()
            self._event_dict[event_name] = event
        return event

Метод on привязывает функцию обратного вызова к определенному событию:

async def on(self, event_name: str, fn: Callable):
        event = self._get_event(event_name)
        while True:
            await event.wait()
            print("event fired")
            result = fn(*event.args, **event.kwargs)
            if inspect.isawaitable(result):
                await result
            
            # Since the callback function is likely a synchronous method, 
            # we must perform an await here to allow other tasks to execute.
            await asyncio.sleep(0.1)
            event.clear()

Метод trigger может вручную вызвать событие и передать соответствующие данные:

def trigger(self, event_name: str, *args, **kwargs):
    event = self._get_event(event_name)

    event.args = args
    event.kwargs = kwargs
    event.set()

Наконец, напишем метод main, чтобы проверить действие EventBus:

def a_sync_callback(data):
    print(f"A sync callback with data {data} is triggered")


async def a_async_callback(data):
    await asyncio.sleep(1)
    print(f"A async callback with data {data} is triggered")


async def main():
    event_bus = EventBus()
    task_one = asyncio.create_task(event_bus.on("some_event", a_async_callback))
    task_two = asyncio.create_task(event_bus.on("some_event", a_sync_callback))

    event_bus.trigger("some_event", {id: 1})
    await asyncio.wait([task_one, task_two], timeout=20.0)


asyncio.run(main())
  

В конце метода main не забудьте использовать таймаут, чтобы программа не выполнялась постоянно, как я уже предупреждал.

Код выполняется, как и ожидалось. Изображение автора.
Код выполняется, как и ожидалось. Изображение автора.
Полный листинг кода программы
import asyncio
from asyncio import Event
import inspect
from typing import Callable


class EventBus:
    def __init__(self):
        self._event_dict = dict()

    def _get_event(self, event_name: str):
        if event_name in self._event_dict:
            print("event already inited...")
            event = self._event_dict.get(event_name)
        else:
            print(f"need to init a new event for {event_name}")
            event = Event()
            self._event_dict[event_name] = event
        return event

    async def on(self, event_name: str, fn: Callable):
        event = self._get_event(event_name)
        while True:
            await event.wait()
            print("event fired")
            result = fn(*event.args, **event.kwargs)
            if inspect.isawaitable(result):
                await result

            # Поскольку функция обратного вызова, скорее всего, является синхронным методом,
            # мы должны выполнить здесь await, чтобы дать возможность другим задачам выполниться.
            await asyncio.sleep(0.1)
            event.clear()

    def trigger(self, event_name: str, *args, **kwargs):
        event = self._get_event(event_name)

        event.args = args
        event.kwargs = kwargs
        event.set()


def a_sync_callback(data):
    print(f"A sync callback with data {data} is triggered")


async def a_async_callback(data):
    await asyncio.sleep(1)
    print(f"A async callback with data {data} is triggered")


async def main():
    event_bus = EventBus()
    task_one = asyncio.create_task(event_bus.on("some_event", a_async_callback))
    task_two = asyncio.create_task(event_bus.on("some_event", a_sync_callback))

    event_bus.trigger("some_event", {id: 1})
    await asyncio.wait([task_one, task_two], timeout=20)

asyncio.run(main())

Выводы

В этой статье сначала было рассказано о том, зачем Python asyncio нужны примитивы синхронизации. Затем я представил лучшие практики для Lock, Semaphore, Event и Condition и дал несколько советов по их правильному использованию.

Наконец, я привёл небольшой пример с практическим применением примитивов синхронизации asyncio, который, я надеюсь, поможет вам лучше использовать примитивы синхронизации в реальных проектах.

Не стесняйтесь комментировать, делиться опытом или обсуждать со мной темы, связанные с asyncio.

Контакты автора статьи

Пожалуйста, подпишитесь, если вы нашли статьи полезными, и получайте новые истории на свой почтовый ящик. Если у вас есть вопросы, вы можете найти меня на LinkedIn или в Twitter(X).

Комментарии (1)


  1. pgubarev
    17.10.2023 15:10
    +1

    Спасибо за статью, читается круто.
    Пример с asyncio.Lock по идее ставит все обращения в очередь, тогда получается что мы не только обновляем данные в кеше последовательно, но и читаем, теряется фактически смысл кеша, так как кэш будет блокирующей частью