В этой статье мы обсудим паттерн "Cancellation Token" (или по-русски - "токен отмены"), популярный в некоторых других языках, но почему-то обойденный вниманием в Python-сообществе. Он о том, как безопасно и красиво завершать работу функции, треда или корутины.

Эта статья — уже третья в серии про многопоточное программирование на Python. Предыдущая была про защиту от дедлоков, но читать её для понимания этой не обязательно. Вся серия предназначена для программистов, знакомых с базовыми концепциями многопоточного программирования.

Да кто такой этот ваш токен отмены?

Программы бывают похожи на звезд шоу-бизнеса или политиков: правильно определить время для ухода со сцены им тоже очень непросто. У вас в программе может быть предусмотрено множество ограничений на ее работу: одно ее действие должно длиться не дольше, чем n секунд, другое должно повторяться до тех пор, пока не что-то там, третье если уж прерывается, то должно прерывать и остальных, и так далее и тому подобное.

Чтобы это все упростить, придуман паттерн "Токен отмены". Его идея проста. Каждый раз, когда мы начинаем какое-то действие, занимающее много времени, мы передаем исполнителю - будь то функция, тред или корутина - специальный объект, у которого исполнитель будет периодически уточнять, стоит еще продолжать работу, или пора бы уже сворачиваться.

Этот объект (дальше будем звать его "токен"):

  • Можно отменить, вызвав у него метод cancel();

  • Может отменить сам себя по какому-то условию, например по истечению таймаута;

  • Может быть вложен в другой токен и отменит его, если будет отменен сам.

Когда объект "отменен", это значит, что исполнитель больше не должен продолжать работу. Он должен лишь регулярно опрашивать токен, но ему больше не нужно самостоятельно отслеживать многочисленные причины, почему ему может потребоваться завершить работу.

Как вы уже, вероятно, поняли из описания выше, код с использованием данного паттерна становится:

  • Удобнее для тестирования. Ведь теперь ограничения по типу таймаутов или отслеживания каких-то внешних условий больше не захардкожены, а передаются туда извне.

  • Красивее и компактнее.

Токены стоит применять для любых потенциально долгих и дорогостоящих операций, а также для тех, чья продолжительность заранее неизвестна и может потребоваться их прервать в любой момент.

Особенно это ценно в контексте многопоточных вычислений. Токены являются удобной абстракцией, скрывающей за единообразным фасадом совершенно разные причины завершения, среди которых могут быть как вычисляемые автономно (сколько времени прошло со старта операции?), так и сообщаемые извне (например, когда пользователь нажал кнопку отмены), в том числе из других потоков. При этом поток завершается "мягко", ведь будучи "отмененным", он все еще может спокойно доделать все свои дела и завершиться корректно.

В некоторых других языках паттерн доступен "из коробки" - прямо в стандартной библиотеке. В Python-комьюнити для него не было устоявшегося названия, поэтому я взял его из C#. А вот в Go те же токены называются уже контекстами (правда там их возможности несколько шире). Для сообщества Python я предлагаю собственную реализацию - библиотеку cantok, все последующие примеры и их обсуждение в этой статье будут крутиться вокруг нее. Если вы захотите повторить приводимые здесь примеры кода, установите библиотеку с помощью команды:

pip install cantok

Быстрый пример

Вот код:

from random import randint
from cantok import ConditionToken, CounterToken, TimeoutToken


token = TimeoutToken(1) + ConditionToken(lambda: randint(1, 100_000) == 1984) + CounterToken(400_000, direct=False)
counter = 0

while token:
  counter += 1

print(counter)

Что мы здесь видим? Есть некоторый цикл. И есть какой-то токен. Пока этот токен эквивалентен True (приведение происходит автоматически при использовании переменных в качестве условия цикла) - цикл будет продолжаться. По умолчанию он равен True, пока не найдется какое-то условие, чтобы его завершить. Условий здесь три:

  1. С начала операции прошло более 1 секунды (за это отвечает TimeoutToken(1));

  2. Произошло крайне редкое событие с вероятностью 1 к 100 000 (ConditionToken(lambda: randint(1, 100_000) == 1984));

  3. Цикл накрутил более 400 000 оборотов (CounterToken(400_000, direct=False)).

Токен, который мы используем в качестве условия цикла, является "суммой" трех других токенов. Такая сумма тоже является токеном, который объединяет все ограничения, описанные его составляющими. В сложных разветвленных программах это должно работать так, что чем дальше по стеку вызовов проходит токен, тем большее количество ограничений он должен в себе заключать.

Подробности о поведении токенов, а также об их видах - читайте дальше.

Как отменить токен или узнать его состояние

По умолчанию любой токен пребывает в активном состоянии. Это значит, что приведение его к bool дает True:

from cantok import SimpleToken

print(bool(SimpleToken()))  # True

У каждого токена есть метод cancel(). После его вызова токен "отменяется", то есть к bool он всегда приводится как False.

token = SimpleToken()
print(bool(token))  # True
token.cancel()
print(bool(token))  # False

Если вам так привычнее, вы можете получать статус токена из динамически вычисляемого атрибута cancelled, только значения там будут инвертированы:

token = SimpleToken()
print(token.cancelled)  # False
token.cancel()
print(token.cancelled)  # True

Если вы не фанат динамических атрибутов и прочей магии, есть также метод is_cancelled(), который делает то же самое:

token = SimpleToken()
print(token.is_cancelled())  # False
token.cancel()
print(token.is_cancelled())  # True

Еще есть метод keep_on(), который делает то же самое, что и приведение к bool:

token = SimpleToken()
print(token.keep_on())  # True
token.cancel()
print(token.keep_on())  # False

Последний способ, но не по значимости - метод check(). В отличие от прочих перечисленных в этом разделе, он не возвращает bool-значение, а поднимает исключение, если токен отменен. Подробнее об этом будет где-то дальше, а пока просто знайте - иногда это крайне удобная фича:

token = SimpleToken()
token.check()
token.cancel()
token.check()  # cantok.errors.CancellationError: The token has been cancelled.

Что из этого всего использовать - решать вам, в сущности все это - разные способы делать одно и то же. Автору больше всего по душе лаконичность приведения к bool, позволяющая проделывать что-то вроде:

while token:
  ...

Виды токенов

Библиотека предоставляет всего 4 типа токенов:

  1. Простой: SimpleToken;

  2. Автоматически отменяемый по таймауту TimeoutToken;

  3. Автоматически отменяемый по произвольному условию ConditionToken;

  4. Отменяемый по количеству обращений CounterToken.

Обсудим подробнее каждый тип.

SimpleToken можно отменить только "вручную", то есть вызвав у него метод cancel(). У него простейший конструктор без обязательных аргументов:

from cantok import SimpleToken

token = SimpleToken()

При создании TimeoutToken в конструктор передается число - количество секунд, по истечению которых токен будет отменен:

from cantok import TimeoutToken

token = TimeoutToken(5)

... или:

token = TimeoutToken(5.0)

ConditionToken первым аргументом принимает функцию, которая должна отвечать на вопрос "отменен ли токен?":

from random import randint
from cantok import ConditionToken

token = ConditionToken(lambda: randint(1, 100) in range(51))  # Кстати, с помощью токенов легко построить онлайн-казино.
print(bool(token))

CounterToken - самый неоднозначный из токенов, пользуйтесь им только если точно уверены, что понимаете, как он работает. По умолчанию я НЕ рекомендую им пользоваться, несмотря на то, что в некоторых случаях это может казаться очень удобным. Что он делает? Он считает количество запросов своего статуса. Примерно так:

from cantok import CounterToken

counter = 0
token = CounterToken(5)

while token:
  counter += 1

print(counter)  # 5

В чем проблема с этим токеном и почему стоит избегать его использования? Главная проблема: он не является контексто-независимым. Получив такой токен в качестве аргумента функции, вы не можете быть уверены, можно ли передать его дальше по стеку вызовов, поскольку не очевидно, на подсчет чего он настроен. Когда вы пишете функцию, вы должны знать контекст, в котором она была вызвана, а значит этот токен в действительности не уменьшает, а увеличивает степень сцепленности компонентов вашего кода. Зачем же я тогда включил этот токен в библиотеку? Иногда написать код так тупо быстрее и короче. Ну а дальше решать уже вам: делать все правильно или побыстрее.

И последнее, все токены - наследники AbstractToken, а значит AbstractToken можно использовать, к примеру, для подсказок типов или для проверок через isinstance:

from cantok import AbstractToken

def function(token: AbstractToken):
  ...

Итак, теперь мы знаем всё о типах токенов и готовы к интересным экспериментам.

Вкладываем токены друг в друга

Токены можно вкладывать друг в друга. Если в функцию в качестве аргумента прилетел токен, она может вложить его в другой токен с каким-то другим ограничением и передать в какую-то другую функцию.

Токен может иметь сложную внутреннюю структуру
Токен может иметь сложную внутреннюю структуру

Для этого при создании нового токена нужно перечислить все вкладываемые токены в его конструкторе сразу после обязательного аргумента. Для SimpleToken это будет выглядеть как-то так, ведь у него нет обязательных аргументов:

token = SimpleToken(TimeoutToken(1), ConditionToken(lambda: randint(1, 100) in range(51)))

А для TimeoutToken примерно так:

token = TimeoutToken(5, SimpleToken(), ConditionToken(lambda: randint(1, 100) in range(51)))

Как вы поняли, любой токен в действительности представляет из себя дерево, и его состояние по умолчанию, когда вложенных токенов нет - всего лишь вырожденный случай. Любой из вложенных в данный токен токенов автоматически его отменяет, будучи отмененным сам. Да славятся деревья, они классные.

Складываем их

Токены можно складывать. Результатом сложения двух разных токенов является всегда является SimpleToken, в который вложены оба слагаемых.

Проделаем это упражнение:

print(repr(TimeoutToken(5) + TimeoutToken(10)))
# SimpleToken(TimeoutToken(5, monotonic=False), TimeoutToken(10, monotonic=False))

Эта фича крайне удобна и позволяет легко накидывать новые ограничения на токен, прежде чем передать его, к примеру, в другую функцию в качестве аргумента. Вам не нужно помнить названия методов, просто складывайте токены и радуйтесь жизни.

Работаем с исключениями

Выше я уже упоминал метод check(), который поднимает исключение, если токен отменен. Предлагаю посмотреть на него чуть пристальнее.

Есть базовый случай, повторю его здесь:

token = SimpleToken()
token.check()
token.cancel()
token.check()  # cantok.errors.CancellationError: The token has been cancelled.

Мы создаем токен и вызываем у него метод check(). Сначала, пока токен не отменен - ничего не происходит. А когда отменен - поднимается cantok.errors.CancellationError. Однако это не единственное исключение, которое может подняться. У каждого токена кроме SimpleToken есть специальное исключение, отнаследованное от cantok.errors.CancellationError, которое поднимается только когда токен отменен по ограничению конкретного типа, свойственному конкретному токену:

from time import sleep
from cantok import TimeoutToken

token = TimeoutToken(1)
sleep(1)
token.check()  # cantok.errors.TimeoutCancellationError: The timeout of 1 seconds has expired.
from cantok import ConditionToken

token = ConditionToken(lambda: True)
token.check()  # cantok.errors.ConditionCancellationError: The condition is not met.
from cantok import CounterToken

token = CounterToken(1)
token.check()
token.check()  # cantok.errors.CounterCancellationError: After 1 attempts, the counter was reset to zero.

Любой из этих токенов, будучи отмененным вызовом метода cancel(), поднимал бы cantok.errors.CancellationError, то есть, повторюсь, специальное типизированное исключение будет поднято только в случае отмены по причине, соответствующей ограничению токена. В сообщении каждого исключения вы можете на человеческом языке прочитать причину, почему конкретный токен был отменен.

Еще у каждого исключения всегда есть атрибут token, в котором хранится конкретный токен, из-за которого было возбуждено исключение. Это может быть полезно на случай сложного разветвленного дерева токенов, когда вам по какой-то причине интересно, какой именно токен послужил причиной отмены. Учитывайте, что поиск конкретного отмененного токена в дереве происходит в глубину и он прерывается после первого найденного отмененного токена, то есть, если их несколько - вы узнаете только об одном, который при обходе дерева попался первым. Пример:

from cantok import SimpleToken, CancellationError

first_token = SimpleToken()
second_token = SimpleToken(first_token)

first_token.cancel()

try:
    second_token.check()  # cantok.errors.CancellationError: The token has been cancelled.
except CancellationError as e:
    print(e.token is first_token)  # True

Отлавливать вы можете как исключения конкретных типов, если вам это почему-то нужно, так и общего типа, от которого они все отнаследованы - cantok.errors.CancellationError. Общий тип импортируется так:

from cantok import CancellationError

Остальные по аналогии, отличаются только имена исключений:

from cantok import TimeoutCancellationError, ConditionCancellationError, CounterCancellationError

Также вы можете отдельно не импортировать исключения, поскольку класс каждого исключения хранится в атрибуте exception соответствующего ему класса токена:

print(SimpleToken.exception)  # <class 'cantok.errors.CancellationError'>
print(TimeoutToken.exception)  # <class 'cantok.errors.TimeoutCancellationError'>
print(ConditionToken.exception)  # <class 'cantok.errors.ConditionCancellationError'>
print(CounterToken.exception)  # <class 'cantok.errors.CounterCancellationError'>

То есть вы можете делать что-то вроде:

token = TimeoutToken(1)
sleep(1)

try:
    token.check()  # cantok.errors.TimeoutCancellationError: The timeout of 1 seconds has expired.
except TimeoutToken.exception:
    ...

Немного практики

Давайте представим, что нам кровь из носу нужно написать функцию, которая будет стучаться по какому-то URL до тех пор, пока не получит какой-то результат. Функция может быть запущена из другого потока и ее должно быть можно оттуда же отменить. Можете попробовать написать такое сами, а вот версия автора:

import logging
from typing import Optional, Union
from cantok import AbstractToken, SimpleToken, TimeoutToken, CancellationError
import requests

def try_until(
    address: str,
    timeout_per_request: Optional[Union[int, float]] = None,
    token: Optional[AbstractToken] = None,
) -> str:
    """
    The function returns the result of a get request to the specified address.

    Attempts to make a request will be repeated until the data is received.
    If you want to pass additional restrictions on the execution time, pass a cancellation token.
    Specify timeout_per_request if you want to be sure that each request is time-limited.

    The maximum running time of the function may slightly exceed the sum of the time 
    prescribed by the cancellation token and timeout_per_request.
    """
    token = SimpleToken() if token is None else token

    if timeout_per_request is not None:
        if timeout_per_request < 0:
            raise ValueError('The timeout cannot be less than zero.')
        request_parameters = {'timeout': timeout_per_request}
    else:
        request_parameters = {}

    while token:
        try:
            return requests.get(address, **request_parameters).text
        except Exception:
            logging.exception(f'Error in the access process at address "{address}". Maybe try again?')

    try:
        token.check()
    except CancellationError as e:
        logging.exception(f'The access operation at address "{address}" has been canceled.')
        raise e


token = TimeoutToken(5)

print(try_until('https://www.google.com/', timeout_per_request=1))  # Должно распечататься содержимое главной страницы одной доброй корпорации.

Обратите внимание, код внутри try_until ничего не знает о том, какие ограничения на него наложены. Таймаут (5 секунд) определяется снаружи функции, а не внутри, а значит ее становится значительно легче тестировать, да и писать ее проще, ведь не нужно повсюду вставлять специфические проверки. Конкретная причина отмены, если таковая будет иметь место, отобразится в логах.

Статья отменена, завершаемся

Итак, мы узнали, что такое токены отмены и почему это круто. И теперь у нас, питонистов, стало на 1 повод для стыда меньше в сравнении с пользователями некоторых других языков. Спасибо за внимание и приходите еще.

Комментарии (9)


  1. danilovmy
    21.11.2023 13:41
    +2

    Я не понял, а где timeout в requests в завершающем примере. Нигде? Тогда: If no timeout is specified explicitly, requests do not time out. И уже не важно какой там токен. Сидим ждём. Ну и пока сидим и ждём, переписываем с реализацией настоящего прекращения по timeout.

    После того, как перепишем, проверяем реализацию. Поскольку параметр timeout в requests не учитывает время после начала передачи данных, и есть риск опять сидеть и ждать.


    1. pomponchik Автор
      21.11.2023 13:41
      +1

      Мне понравилось это замечание, таймаут на запрос как-то упустил в процессе подготовки статьи. Поправил пример кода.


  1. turboslon
    21.11.2023 13:41

    Правильно я понимаю, что с multiprocessing работать не будет?


    1. pomponchik Автор
      21.11.2023 13:41

      Верно.


      1. vda19999
        21.11.2023 13:41

        Видимо, если вы в другой процесс передадите токен, то его не удастся отменить, но по таймауту-то отмена сработает?


        1. pomponchik Автор
          21.11.2023 13:41
          +1

          Он не сериализуется для передачи другому процессу. Но даже если бы да, я бы не рекомендовал это использовать, инструмент предназначен только для многопоточки.

          Вообще, я подумываю над тем, чтобы добавить поддержку процессов. Хотя это с высокой вероятностью придется выносить в отдельный инструмент.


  1. baldr
    21.11.2023 13:41
    +1

    Было бы удобно завести какой-нибудь токен для asyncio и его друзей.

    Например, создаем токен и делаем await на него, после cancel он возвращает управление. Сейчас для таких целей использую asyncio.Event, но ваши токены выглядят удобнее.

    В первую очередь хочется избавиться от конструкций вроде таких:

    await asyncio.wait([e.wait() for e in stop_events], timeout=10, return_when=asyncio.FIRST_COMPLETED)


    1. pomponchik Автор
      21.11.2023 13:41
      +1

      Да, это хорошая идея, я добавил такую возможность для всех токенов, можете попробовать.


      1. baldr
        21.11.2023 13:41

        Спасибо, но у меня есть несколько замечаний.

        Во-первых, мне кажется, что введение метода wait только для асинхронных операций - не очень красиво. И не очень очевидно. Я бы предложил реализовать эту фичу и для синхронного режима тоже. Варианты: методы wait/wait_async, что-то вроде аналога asgiref.sync.sync_to_async или вообще декоратор.

        Кстати, вместо `await token.wait()` может быть сделать просто `await token` ?

        Во-вторых, ваша реализация с помощью while/sleep(0.0001) мне кажется очень неправильной. Смысл asyncio в том чтобы освобождать процессор, а вы заставляете его переключаться в вашу проверку и заново проверять все условия (втч синхронные) - каждый цикл эвентлупа. Простите, но уж лучше я останусь со своим вариантом. Должен сказать, что не могу сразу предложить вариант лучше. Возможно имеет смысл ввести отдельный асинхронный токен. И, возможно, не разрешать его смешивать с синхронными (по крайней мере поначалу). Поскольку, например ваша run_function теоретически тоже может пригодиться быть асинхронной.

        Кстати, ваш CounterToken в этом методе лучше вообще принудительно выключать.

        Замечание по документации, на которую вы дали ссылку - вы же в курсе что у вас код синхронный в примере? Несмотря на await, все строки кода будут выполняться последовательно. Метод asyncio.create_task подошел бы лучше для вызова do_something.

        Безо всей этой асинхронной ерунды ваша реализация выглядит красиво. Не рассматривайте мои предложения как требования - может быть и оставить всё это синхронным..