За время менторства начинающих разработчиков заметил, что большинство вопросов связаны с темами: конкурентность, параллелизм, асинхронность. Их часто задают на собеседованиях, а в работе эти знания позволяют писать более эффективные и производительные системы. Поэтому я решил систематизировать свое видение и опыт в виде статьи.

Цель - c помощью примеров рассказать:

  • О потоках, процессах и корутинах.

  • О Global Interpreter Lock и накладываемых им ограничениях.

  • CPU и I/O bound нагрузке.

  • В чем польза от ThreadPool и ProcessPool.

Задача №1: Работа с JSON или CPU-bound task

Рассмотрим функцию:

# CPU bound task: generate list and dump it into JSON
def work(size):
    json.dumps(list(range(size)))

Она делает 2 вещи: генерирует список и превращает его в JSON. Подобный код - отличный пример CPU bound задачи - задачи, скорость выполнения которой зависит от мощности процессора.

Представим что нам нужно выполнить эту тяжеловесную задачу N раз. Последовательный алгоритм будет выглядеть так:

# Do work sequentially, one by one
def sequential(size, count):
    for _ in range(count):
        work(size)

Очевидно, что последовательное выполнение кода неэффективно. Какие варианты ускорить программу нам доступны?

Вариант №1 - создать по одному потоку на каждый вызов функции и попробовать добиться параллелизма:

# Do work using OS Threads
def run_threads(size, executionUnitsCount):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(work, [size] * executionUnitsCount)

Вариант №2 - вместо потоков создавать процесс:

# Do work using OS Processes
def run_processes(size, executionUnitsCount):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(work, [size] * executionUnitsCount)

В обоих случаях используются executor классы из пакета сoncurrent.futures. Если использовать пакеты multithreading и multiprocessing то код бы выглядел примерно так:

Hidden text
# Do work using OS Threads
def run_threads(size, executionUnitsCount):
    threads = [threading.Thread(target=work, args=(size,)) for _ in range(executionUnitsCount)]

    for t in threads:
        t.start()
    
    for t in threads:
        t.join()

# Do work using OS Processes
def run_processes(size, executionUnitsCount):
    processes = [multiprocessing.Process(target=work, args=(size,)) for _ in range(executionUnitsCount)]
    
    for p in processes:
        p.start()
    
    for p in processes:
        p.join()

Код тестирующий производительность всех реализаций:

if __name__ == '__main__':
    # Disable GC for better benchmarking and avoid pauses
    gc.disable()
    
    jsonSize = 1000000
    testCases = [ (jsonSize,  i) for i in range(1, 11)]
    variants = [sequential, run_threads, run_processes]

    for i, t in enumerate(testCases):
        size, executionUnitsCount = t
        print(f"Parallelism: {executionUnitsCount}, JSON Size: {size}")
        
        for j, variant in enumerate(variants):        
            start = time.perf_counter()

            variant(size, executionUnitsCount)

            end = time.perf_counter()

            print(f"{variant.__name__}, elapsed: {round(end - start, 2)}")
        print()

Результаты:

Результаты выполнения теста. Ось X - количество задач. Ось Y - время выполнения в секундах
Результаты выполнения теста. Ось X - количество задач. Ось Y - время выполнения в секундах

Какие выводы можно сделать?

  • Использование потоков для параллельного исполнения CPU bound задач не дает преимущества, результаты сопоставимы с кодом не использующим потоки и работающим последовательно. С чем это связано? Все дело в GIL - механизме синхронизации потоков, именно он не позволяет программе "набрать мощность" и выполнять потоки параллельно.

  • На процессы GIL не распространяется и создав дочерние процессы мы начинаем утилизировать ресурсы процессора по максимуму и как следствие видим уменьшение времени исполнения на графике.

  • Максимальный параллелизм которого можно добиться c помощью процессов равен количеству физических ядер CPU. На графике можно заметить что время исполнения теста Processes растет примерно лесенкой с шагом 4 - это количество физических ядер CPU на моем ноутбуке.

Задача №2: Работа с внешним API или IO-bound task

Помимо CPU bound существуют задачи IO bound. В такой задаче производительность зависит не от процессора а от подсистемы ввода-вывода, а также устройств с которыми мы взаимодействуем, например файловая система или сеть.

В примере ниже я буду эмулировать IO нагрузку через sleep (представьте что вместо sleep - запрос по HTTP к API вашего любимого сервиса).

def work(latency):
    time.sleep(latency)

Задача: сделать N вызовов функции (например, мы сервис агрегатор услуг и ходим к партнерам за данными). Последовательный вариант выполнения задачи будет выглядеть так:

def sequential(latency, count):
    for _ in range(count):
        work(latency)

Конкурентная реализация на потоках:

# Do work using OS Threads
def run_threads(latency, executionUnitsCount):
    with concurrent.futures.ThreadPoolExecutor(max_workers=executionUnitsCount) as executor:
        executor.map(work, [latency] * executionUnitsCount)

Конкурентная реализация на процессах:

# Do work using OS Processes
def run_processes(latency, executionUnitsCount):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(work, [latency] * executionUnitsCount)

Конкурентная реализация на основе asyncio:

def run_async_io(latency, executionUnitsCount):
    asyncio.run(async_io_tasks(latency, executionUnitsCount))


async def async_work(latency):
    await asyncio.sleep(latency)


async def async_io_tasks(latency, executionUnitsCount):
    tasks = [asyncio.create_task(async_work(latency)) for _ in range(executionUnitsCount)]
    
    await asyncio.gather(*tasks)

Бенчмарк (Чтобы показать разницу между подходами тестируем от 1000 до 2000 конкурентных задач):

if __name__ == '__main__':
    testCases = [ (0.1,  i) for i in range(1000, 20001, 200)]
    variants = [run_threads, run_async_io, run_processes]
    
    for i, t in enumerate(testCases):
        latency, executionUnitsCount = t
        print(f"Parallelism\: {executionUnitsCount}")
        
        for j, variant in enumerate(variants):
            start = time.perf_counter()

            r = variant(latency, executionUnitsCount)

            end = time.perf_counter()
            print(f"{case.__name__}, elapsed: {round(end - start, 2)}")
        print()

Результаты:

Результаты выполнения теста. Ось X - количество конкурентных задач (процессов и потоков). Ось Y - время выполнения в секундах
Результаты выполнения теста. Ось X - количество конкурентных задач (процессов и потоков). Ось Y - время выполнения в секундах

Увеличим масштаб и сравним потоки и корутины:

Результаты выполнения теста. Ось X - количество конкурентных задач (процессов и потоков). Ось Y - время выполнения в секундах
Результаты выполнения теста. Ось X - количество конкурентных задач (процессов и потоков). Ось Y - время выполнения в секундах

Какие выводы можно сделать?

  • Процессы в чистом виде непригодны для подобного класса задач, слишком большой оверхед на создание процесса и его убийство, взаимодействовать с ОС так часто - дорогое удовольствие.

  • Потоки справились намного лучше процессов, и GIL в данной задаче не стал помехой, так как он запрещает "исполняться" нескольким процессом одновременно, а в случае IO нагрузки потоки дольше находятся в ожидании чем в работе . Но оверхед по сравнению с корутинами присутствует так как мы взаимодействуем с ОС и аллоцируем память.

  • Корутины - абсолютный лидер, за счет того что ими управляет рантайм языка а не ОС, к тому же с точки зрения памяти корутине нужно ее меньше чем потоку.

Есть ли ограничения по количеству процессов/потоков/корутин?

В примерах выше мы запускали разное количество задач и у любопытного читателя может возникнуть вопрос: Сколько можно создавать процессов и потоков в программе? Чем мы платим за их создание? Ответ довольно прозаичен: мы платим оперативной памятью нашей машины. Для каждого процесса и потока ОС резервирует определенное количество памяти, а так как память конечна то без остановки создавая процесс или поток мы рано или поздно столкнемся с OOM (Out of memory error).

Чтобы избегать подобных ошибок в production системах ограничивается количество активных процессов и потоков с помощью популярных механизмов ThreadPool и ProcessPool.

Итоги

В данной статье я показал практическое применение потоков, процессов и корутин. На примерах разобрались в каких задач применим тот или иной инструмент. Для того чтобы упростить код были выбраны простые синтетические примеры, но при этом не противоречащие production коду который мы встречаем ежедневно в работе.

Полный код из статьи доступен на GitHub.

Статьи для дополнительного ознакомления:

Спасибо что прочитали до конца, буду рад любой конструктивной обратной связи в комментариях!

Комментарии (7)


  1. evgenyk
    12.11.2023 19:13
    +2

    • Процессы в чистом виде непригодны для подобного класса задач, слишком большой оверхед на создание процесса и его убийство, взаимодействовать с ОС так часто - дорогое удовольствие.

    ИМХО, процессы отлично работают для таких задач, только писать нужно немножко по другому. Если мы хотим скажем обрабатывать что-то в нескольких параллельных ветках, то пишем воркер, который будучи запущенным, ждет на вход сообщение с входной информацией, обработав ее, он посылает сообщение с результатом и ждет следующего сообщения. В начале работы запускаем сколько хотим воркеров, в конце работы завершаем их работу.

    Таким образом создание процессов происходит один раз для каждого процесса, а все преимущества процессов остаются.


    1. lebron32rus Автор
      12.11.2023 19:13

      Спасибо за комментарий!

      Можете привести пример кода, как можно под задачу из статьи адаптировать процессы чтобы они работали эффективнее?


      1. GoDevSeoTaxi
        12.11.2023 19:13

        В качестве примера после комментария@evgenyk - мне лично вспоминается svchost.exe и services.exe


      1. JOHN_16
        12.11.2023 19:13

        Речь идет скорее всего о модуле queue в целом, и в частности о реализации с multiprocessing. Примеры легко найти в интернет поисковиках.


  1. john_2013
    12.11.2023 19:13

    Получается, при наличии асинхронности потоки полностью неактуальны для любых задач?


    1. fishHook
      12.11.2023 19:13

      Если я правильно помню, питоньи либы написанные на Си имеют возможность "отпустить" GIL, то есть какую-нибудь (условно!) функцию из numpy распараллелить по потокам, это может иметь смысл


      1. Ingeniosus
        12.11.2023 19:13

        Даже стандарный multiprocessing.pool не ограничен GIL. Его логика скорее как и у gc, упросить написание кода. Т.е. асинк простой и защищен разрабами языка, а вот pool уже предполагает что "тут все взрослые".