Любая задача может быть завершена несколькими способами:
  1. Нормальное завершение, после которого мы получаем результат (например, Ничего/Единица).
  2. Сообщение об ошибке.
  3. Принудительное завершение, после которого программа может остаться в неопределённом состоянии.

В корутинах Python есть известные точки для безопасной приостановки выполнения. Пользуясь ими, можно автоматически подводить программу к результатам #1 и #2. Поскольку приостановка потоков происходит по принципу вытеснения, именно программист, реализующий потоки, отвечает за достижение результатов #1 и #2. При работе с потоками всегда сохраняется вероятность исхода #3 – поток может быть уничтожен извне, а задача этого даже не заметит, и не получит ни одного шанса осуществить очистку после выполнения и аккуратно завершиться. С потоками существуют две фундаментальные проблемы: (A) невозможно узнать, когда их работа может быть прервана из-за переключения контекста в операционной системе и (B) никогда не угадаешь, когда поток может быть принудительно завершён. Поэтому при работе с потоками принято писать код так, чтобы A и B не имели значения. При конкурентной обработке это неприемлемо, поэтому, если речь идёт о конкурентности, то вариант #3 мы должны полностью исключить.

При работе с корутинами переключение контекста обычно происходит безопасно (да, даже в Python и Go по-прежнему можно написать код, приводящий к условиям гонки, но в Rust это предотвращается). В корутинах Python есть встроенная обработка отмены (о которой рассказано здесь), причём, как оказывается, это свойство кода тесно связано с тем, насколько он приспособлен к обработке ошибок.

Когда мы создаём комплект задач, выполняемых «одновременно» (в разных языках/системах эта фраза может значить не одно и то же), все эти конкурентно работающие задачи могут и ошибки выдавать «одновременно».

Тогда возникает вопрос: как и когда будем проверять ошибки? Если мы одновременно выполняем множество задач, то что произойдёт, если одна из этих задач выдаст ошибку? Мы же не хотим, чтобы одна ошибка как-нибудь затёрла другую или как-нибудь получила приоритет над другими ошибками.

Так что же должно произойти с другими задачами, если в одной из задач произойдёт ошибка? Если мы имеем отказ в одной из задач, то не можем позволить себе полагаться на какие-либо результаты, выдаваемые другими задачами. Можем либо позволить этим задачам работать до завершения, снять их результаты и далее предоставить программисту решать, что же делать, исходя из результата и из сути ошибки – так, библиотека tokio из Rust делает это по умолчанию. Можно также предположить, что все результаты прочих задач бесполезны и все их отменить – так принято делать, работая с Python-библиотекой asyncio, поскольку в ней есть встроенная система отмены.

Обратите внимание, что в вышеизложенном неявно предполагается следующее: есть группа задач, которые начинают выполняться вместе, и у каждой из которых есть некая точка завершения. По завершении выполнения вы сможете обработать условия возможных ошибок сразу для всей группы. Это важно, поскольку у программиста должен быть способ обрабатывать ошибки с минимально возможным уровнем детализации – то есть, максимально близко к той точке, в которой ошибка произошла. Дело в том, что именно в этой точке вы располагаете самой полной информацией и можете принимать наиболее точные решения. Речь не о том, что вы вынуждены работать над ошибками при минимальном уровне детализации, а в том, что можете.

В традиционных конкурентных системах, основанных на работе с потоками, задачи обычно понимаются как действия, выполняемые «по одному за раз». Но практически всегда приходится создавать задачи в большом количестве (или же не получится воспользоваться преимуществами конкурентности). Обработка этих задач как единой тесно связанной группы – это основа структурированной конкурентности. Здесь вспоминается, как мы постепенно эволюционировали от беспорядочных скачков по программе до строгого соблюдения работы с функциями. Структурированная конкурентность позволяет очертить конечную область применения для группы корутин – точно так же, как функция создаёт область видимости для фрагмента кода. Ограничения, налагаемые на работу с этой областью видимости, помогают подробнее разобраться, как именно функционирует код. Программист поднимается над ситуацией и может не беспокоиться о мелких деталях – всё это способствует продуктивности и надёжности в работе. Классическое введение в работу со структурированной конкурентностью приведено здесь.

В языках, не поддерживающих структурированную конкурентность, приходится использовать своего рода «объединение», чтобы заложить такую точку завершения, в которой могут закончить выполняться все задачи, де-факто создав «область видимости» (как правило, такое «объединение» можно организовать несколькими способами). При использовании структурированной конкурентности такая область видимости создаётся явно, и поэтому в программе возникает точка завершения. Из этой точки можно возвращать сущности, в которых содержатся результаты и ошибки, поступившие от отдельно взятых задач. Область видимости, обеспечиваемая при структурированной конкурентности, даёт минимальный уровень детализации для группы задач.

Rust-подход


Поскольку в Rust для захвата ошибок используется тип Return, исключения здесь не нужны. Так и задумано, ведь исключения – это побочные эффекты, а Return в данном случае вынуждает вас обрабатывать ошибки именно там, где они происходят. Это и есть минимальный уровень детализации.

В следующем примере воспользуюсь крейтом thiserror, так как хочу получить типы ошибок, которые позволили бы сымитировать те, что будут у меня в последующей программе на Python. Как обычно, по умолчанию буду пользоваться библиотекой корутин tokio, поскольку она с отрывом лидирует как наиболее употребительная.

use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::Mutex;
use tokio::time::sleep;
use FallibleError::*;

#[derive(Error, Debug)]
pub enum FallibleError {
    #[error("V[{0}]")]
    ValueError(i32),
    #[error("T[{0}]")]
    TabError(i32),
    #[error("A[{0}]")]
    AttributeError(i32),
}

async fn fallible(
    i: i32,
    stdout: Arc<Mutex<()>>,
) -> Result<char, FallibleError> {
    sleep(Duration::from_millis(100)).await;
    {
        // В любой момент вывод может поступать только от одной задачи:
        let _lock = stdout.lock().await;
        println!("fallible({})", i);
    } // блокировка высвобождена

    match i {
        1 => Err(ValueError(i)),
        3 => Err(TabError(i)),
        5 | 6 => Err(AttributeError(i)),
        // 7 => panic!("i:{} panicked!", i),
        _ => {
            sleep(Duration::from_secs(3)).await;
            Ok((b'a' + i as u8) as char)
        }
    }
}

#[tokio::main]
async fn main() {
    // Не даёт задачам наслаиваться на вывод std:
    let stdout = Arc::new(Mutex::new(()));

    let tasks: Vec<_> = (0..8)
        .map(|i| {
            tokio::spawn(fallible(i, stdout.clone()))
        })
        .collect();

    // Задачи ещё не начали работу: соответственно, нет состязания 
    println!("Tasks created");

    // Запускаем все задачи и выполняем их до завершения:
    let results: Vec<_> =
        futures::future::join_all(tasks).await;

    for result in results.iter() {
        // Корутины закончили работу: блокировка здесь не требуется
        print!("{:?} => ", result);
        match result {
            Ok(Ok(l)) => println!("Letter: {}", l),
            Ok(Err(e)) => println!("Err: {}", e),
            Err(p) => println!("Panic: {:?}", p),
        }
    }
}

Крейт thiserror позволяет с лёгкостью создавать типы ошибок, в том числе, многочисленные варианты форматирования для оформления результатов ошибок. Пожалуй, FallibleError – простейший вариант применения этого инструмента.

Функция fallible() пытается принять свой аргумент i32 и преобразовать его в char. В таком случае отказы приводят к различным типам FallibleError. Я бы добавил сюда sleep(), чтобы стало ещё интереснее.

Стандартный вывод – это разделяемый ресурс. Соответственно, если множество корутин пишет в стандартный вывод, то они могут толкаться друг с другом, и их вывод будет перемежаться. Чтобы этого избежать, создаём Mutex под названием stdout и требуем: каждая задача, собирающаяся писать в стандартный вывод, сначала должна принять эту блокировку. Поскольку stdout (создаваемый в самом начале main()) совместно используется всеми задачами, его необходимо обернуть в Arc, так, чтобы его можно было клонировать.

Функция main() создаёт вектор Vec задач fallible(). Задачи Rust не начинают выполняться, пока не будут опрошены, поэтому мы увидим сообщение “Tasks created” (Задачи созданы) ещё до того, как задачи начнут работать (и этот вывод в защите не нуждается). Вызов join_all() как запускает задачи, так и доводит их все до завершения.

Как только задачи завершены, они перестают конкурировать за стандартный вывод, поэтому отображать результаты можно без применения мьютексов.

При окончательном сопоставлении с образцом можем убедиться, что задача, возвращающая результат, на выходе даёт Ok(Ok(l)), тогда как задача, возвращающая ошибку, на выходе даёт Ok(Err(e)). Эти результаты даёт функция join_all(), обёртывающая Result любой успешно выполняемой задачи – такой, которая приводит к Ok, а не к панике. Если задача завершается без паники, то имеем Err(p). Таким образом, удаётся обработать успешное завершение, все ошибки и все варианты паники.

Если раскомментируете // 7 => panic!(«i:{} panicked!», i), то увидите, что паника захватывается при сопоставлении с образцом – в данном случае вывод паники часто может наслаиваться на вывод от других задач. Чтобы справиться с этой проблемой, потребуется написать собственный обработчик паник, использующий блокировку stdout.

Вот вывод. Операторы fallible(n) могут идти в любом порядке:

Tasks created
fallible(1)
fallible(4)
fallible(2)
fallible(0)
fallible(5)
fallible(6)
fallible(3)
fallible(7)
Ok(Ok('a')) => Letter: a
Ok(Err(ValueError(1))) => Err: V[1]
Ok(Ok('c')) => Letter: c
Ok(Err(TabError(3))) => Err: T[3]
Ok(Ok('e')) => Letter: e
Ok(Err(AttributeError(5))) => Err: A[5]
Ok(Err(AttributeError(6))) => Err: A[6]
Ok(Ok('h')) => Letter: h

Как вы заметите, после вывода fallible(n) во всех случаях наступает трёхсекундная пауза. В Tokio не предусмотрена автоматическая отмена, поэтому, даже несмотря на наличие ошибок, все задачи выполняются до завершения. (Обратите внимание: сейчас в Rust ведётся работа над структурированной конкурентностью, но информация об этом в лучшем случае фрагментарна — по таким вещам как tokio-scoped напрашивается вывод, что лучше было бы использовать async_scoped; мне не удалось найти ни одного полного рабочего примера с участием последней. Если вы знаете о каких-нибудь более хороших библиотеках – упомяните их в комментариях.)

Python-подход


В Python структурированная конкурентность поддерживается при помощи asyncio.TaskGroup. Если одна из задач в группе TaskGroup выбрасывает исключение, то все остальные задачи из этой группы автоматически отменяются. Имея концепцию группы и её области видимости, программист не обязан управлять всей этой рутиной.

Вот Python-версия функции fallible(). У меня не возникло никаких проблем с выводом от этого примера на Python (возможно, всё благодаря GIL), поэтому я не исследовал эту проблему (код для всех примеров на Python):

# fallible.py
import asyncio
from asyncio import CancelledError
from pprint import pformat


async def fallible(i: int) -> str:
    await asyncio.sleep(0.1)
    print(f"fallible({i})")
    match i:
        # Если закомментировать всё кроме '_' – имеем успех.
        case 1:
            raise ValueError(f"VE[{i}]")
        case 3:
            raise TabError(f"TE[{i}]")
        case 5 | 6:
            raise AttributeError(f"AE[{i}]")
        case _:
            await asyncio.sleep(3)
            # Преобразовать число в букву:
            return chr(ord("a") + i)


def display(e: Exception, msg: str = ""):
    print(f"{msg}{type(e).__name__}")
    if not isinstance(e, CancelledError):
        print(
            f"  {pformat(e.args, indent= 2, width=47)}"
        )

Единственный вариант, в котором задача fallible() выполняется успешно – это завершение case _.

Функция display() предоставляет (и форматирует) информацию об Exception, но выводит args только в том случае, если это не CancelledError (которая, в свою очередь, не содержит args).
Сначала давайте рассмотрим те исключения, что были выданы задачами, созданными внутри TaskGroup:

# exception_groups_1.py
import asyncio
from fallible import fallible, display


async def main() -> None:
    tasks = []
    try:
        # выдать RuntimeError("Outside TaskGroup")
        async with asyncio.TaskGroup() as tg:
            # выдать RuntimeError("In TaskGroup")
            tasks = [
                tg.create_task(
                    fallible(i),
                    name=f"Task {i}",
                )
                for i in range(8)
            ]
        print("No exceptions")
    except Exception as e:
        display(e)
    finally:
        print("- Tasks Complete -")

    for t in tasks:
        print(
            f"{t.get_name()} -> "
            + f"cancelled[{t.cancelled()}]"
        )
        if not t.cancelled():
            try:
                # в случае отказа выдается t.result() 
                print(f"\t{t.result()}")
            except Exception as e:
                display(e)


if __name__ == "__main__":
    asyncio.run(main())

При структурированной конкурентности, когда мы достигаем пределов области видимости async with asyncio.TaskGroup(), все задачи гарантированно будут завершены, причём, возможны такие варианты:
1. Задача завершается успешно и возвращает значение.
2. Задача завершается неуспешно и приводит к ошибке, выдавая при этом исключение.
3. Задача отменяется, либо из-за внешнего фактора, либо из-за того, что другая задача в той же группе выбросила исключение (в последнем случае отмена происходит автоматически).

В вышеприведённом примере исключение выбрасывается всегда, поэтому дело никогда не доходит до print(«No exceptions»). Мы просто отлавливаем обобщённое Exception, чтобы посмотреть, что произойдёт, и из вывода всё ясно:

fallible(0)
fallible(2)
fallible(6)
fallible(5)
fallible(7)
fallible(4)
fallible(1)
fallible(3)
ExceptionGroup
  ( 'unhandled errors in a TaskGroup',
  [ AttributeError('AE[6]'),
    AttributeError('AE[5]'),
    ValueError('VE[1]'),
    TabError('TE[3]')])
- Tasks Complete -
Task 0 -> cancelled[True]
Task 1 -> cancelled[False]
ValueError
  ('VE[1]',)
Task 2 -> cancelled[True]
Task 3 -> cancelled[False]
TabError
  ('TE[3]',)
Task 4 -> cancelled[True]
Task 5 -> cancelled[False]
AttributeError
  ('AE[5]',)
Task 6 -> cancelled[False]
AttributeError
  ('AE[6]',)
Task 7 -> cancelled[True]

Если задача выбрасывает исключение внутри TaskGroup, то это исключение “содержится в” ExceptionGroup, это один из типов Exception. Обратите внимание: все ошибки, производимые всеми задачами в группе собираются в список, находящийся в единственном ExceptionGroup, после того, как первый аргумент опишет исключение.

Как видите, после Tasks Complete (из-за выбрасывания исключений) автоматически отменяются все задачи, которые могли бы выполниться успешно и дать результат. Каждый из результирующих объектов Task либо отменяется, либо содержит ошибку, о которой ранее было сообщено.

Захват групп исключений


Если приходится разбирать исключения в группе вручную, то вся фича становится менее удобной. Поэтому была реализована возможность захватывать конкретные типы. Для этого вам понадобится ключевое слово except* (добавленное специально для поддержки групп исключений), а не просто except:

# exception_groups_2.py
import asyncio
from fallible import fallible, display
from asyncio import CancelledError


async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(fallible(i))
                for i in range(8)
            ]
    except* ValueError as e:
        display(e)
    except* TabError as e:
        display(e)
    except* AttributeError as e:
        display(e)
        # Перебираем отдельно взятые исключения:
        for ex in e.exceptions:
            display(ex)
    except* CancelledError as e:  # Такого никогда не происходит
        display(e)
    finally:
        print("- Tasks Complete -")

    for t in tasks:
        print(f"{t.get_name()} -> ", end="")
        try:  # выбрасывает исключение, если нет t.result():
            print(f"{t.result()}")
        except Exception as e:
            display(e, "Exception: ")
        # CancelledError – это подкласс BaseException:
        except BaseException as e:
            display(e, "BaseException: ")


asyncio.run(main())

Из вывода понятно, как работает эта фича:

fallible(0)
fallible(2)
fallible(6)
fallible(5)
fallible(7)
fallible(4)
fallible(1)
fallible(3)
ExceptionGroup
  ( 'unhandled errors in a TaskGroup',
  [ValueError('VE[1]')])
ExceptionGroup
  ( 'unhandled errors in a TaskGroup',
  [TabError('TE[3]')])
ExceptionGroup
  ( 'unhandled errors in a TaskGroup',
  [ AttributeError('AE[6]'),
    AttributeError('AE[5]')])
AttributeError
  ('AE[6]',)
AttributeError
  ('AE[5]',)
- Tasks Complete -
Task-2 -> BaseException: CancelledError
Task-3 -> Exception: ValueError
  ('VE[1]',)
Task-4 -> BaseException: CancelledError
Task-5 -> Exception: TabError
  ('TE[3]',)
Task-6 -> BaseException: CancelledError
Task-7 -> Exception: AttributeError
  ('AE[5]',)
Task-8 -> Exception: AttributeError
  ('AE[6]',)
Task-9 -> BaseException: CancelledError

Каждое условие except* выбирает все исключения данного конкретного типа и помещает их в другую ExceptionGroup. При помощи except* AttributeError можно сложить множество исключений одного и того же типа в результирующей ExceptionGroup.

Можно перебирать исключения, содержащиеся внутри ExceptionGroup, при помощи e.exceptions, но, как правило, нас волнует, случилось ли AttributeError – и мы пишем код для обработки этой ситуации, а не разбираемся в произошедшем далее.

По каждой задаче здесь содержится конкретная информация о её возвращаемом значении/отмене/исключении. Также мы видим, каким образом CancelledError является типом BaseException, а не типом его подкласса Exception. Дело в том, что CancelledError — это механизм для реализации отмены, а не для ситуаций, которые обычно встречаются в программе (точно так же исключения используются внутри программы для завершения циклов for).
Область видимости, создаваемая при помощи структурированной конкурентности, не просто избавляет программиста от необходимости писать некие явные «объединения», которые доводили бы все задачи до завершения. Такая область видимости также гарантирует правильную автоматическую отмену моей задачи, если она выполнится неудачно, а ещё собирание ошибок, сгенерированных всеми задачами. Именно поэтому в данном введении по структурированной конкурентности проводится аналогия с тем, как мы постепенно научились работать с функциями.

Комментарии (3)


  1. Alesh
    27.10.2023 09:03

    Статья на тему как найти себе приключений на равном месте? По моему правило везде одно. Если в короутине необработанное исключение, программа должна быть завершена аварийно.

    Правильно разрешить исключение одной задачи, если задачи в группе разнородны да еще и влияют на результат друг друга, вряд ли возможно.


  1. Pab10
    27.10.2023 09:03
    +1

    Не хватает тега Python. Для себя уточнил возможности asyncio про который 100 раз слышал, но использовать не доводилось. Выглядит приятно и понятно.


    1. funca
      27.10.2023 09:03

      Это появилось относительно недавно (по питонячим меркам) - в python 3.11 и ещё не сильно распространено.

      Гвидо утащил позаимствовал TaskGroup из trio (в оригинале "nurseries"). Для более ранних версий питона можно использовать quattro, но разумеется без нового синтаксиса "except*" для обработки исключений.

      asyncio.gather(), которой раньше выполнял аналогичную функцию, теперь использовать не рекомендуется. У него в реализации с рождения есть несколько неприятных багов, которые ни кто так и не отважился починить (чтобы не ломать обратную совместимость).