Данная статья написана новичком для новичков, т.е. для тех, кто только начинает изучать возможности многопроцессорного и многопоточного программирования в Python. Статья намеренно пишется без воды и со скомканной теорией, в стиле шпаргалки.

multiprocessing

Модуль multiprocessing в Python предоставляет возможности для работы с многопроцессорным программированием, позволяя создавать и управлять процессами, обмениваться данными между процессами, использовать пулы процессов и другие механизмы для параллельного выполнения задач.

Некоторые ключевые функции и классы модуля multiprocessing:

  1. Process: Класс для создания и управления процессами. Позволяет запускать функции в новом процессе.

  2. Pool: Класс, предоставляющий пул процессов для выполнения задач параллельно. Позволяет управлять пулом процессов и распределять задачи между процессами.

  3. Queue: Класс для обмена данными между процессами через очередь, обеспечивая безопасность при работе с разделяемыми данными.

  4. Manager: Класс для управления разделяемыми объектами между процессами. Позволяет создавать разделяемые списки, словари, очереди и другие.

  5. Lock, Event, Condition, Semaphore: Классы для синхронизации процессов и предотвращения состязаний (race conditions) при доступе к общим ресурсам.

  6. Pipe: Механизм для обмена данными между двумя процессами через двусторонний канал.

  7. cpu_count(): Функция для возвращения количества доступных в системе процессоров (ядер).

Использование модуля multiprocessing позволяет эффективно использовать ресурсы многопроцессорной системы, ускорить выполнение задач и реализовать параллельное выполнение вычислений. Однако следует учитывать особенности многопроцессорного программирования, такие как управление разделяемыми данными, синхронизация процессов и предотвращение возникновения конфликтов при доступе к общим ресурсам.

Так же сразу необходимо вспомнить о популярной функции map(), в multiprocessing.Pool, которая используется для распределения задач на выполнение между процессами в пуле. Она позволяет применить указанную функцию к каждому элементу входного списка последовательно или параллельно в нескольких процессах. Затем map() ожидает завершения всех задач и возвращает список результатов в том же порядке, в котором были переданы входные данные.

Пример использования map() с процессами из пула:

from multiprocessing import Pool

# Функция, которая будет применена к каждому элементу
def square(n):
    return n * n

if __name__ == '__main__':
    # Создание пула из 4 процессов
    with Pool(processes=4) as pool:
        # Входной список значений
        values = [1, 2, 3, 4, 5]
        # Применение функции square к каждому элементу в списке параллельно
        results = pool.map(square, values)
        print(results)

В приведенном примере функция square() применяется к каждому элементу входного списка values параллельно с использованием четырех процессов в пуле. После завершения выполнения всех задач метод map() возвращает список с результатами применения функции к каждому элементу входного списка.

map() удобно использовать для обработки данных в пуле процессов, когда требуется распределить задачи на выполнение параллельно и получить результаты после завершения всех задач.

Если же мы не знаем количество доступных ядер в нашей системе, то код можно написать с использованием упомянутой выше функции cpu_count():

from multiprocessing import Pool, cpu_count

# Функция, которая будет применена к каждому элементу
def square(n):
    return n * n

if __name__ == '__main__':
    # Создание пула из 4 процессов
    with Pool(processes=cpu_count()) as pool:
        # Входной список значений
        values = [1, 2, 3, 4, 5]
        # Применение функции square к каждому элементу в списке параллельно
        results = pool.map(square, values)
        print(results)

threading

Модуль threading в Python предоставляет инструменты для работы с потоками выполнения (threads) в многопоточном программировании. Потоки позволяют выполнять несколько задач в одном процессе параллельно, что помогает улучшить отзывчивость программы и увеличить общую производительность.

Некоторые ключевые функции и классы модуля threading:

  1. Thread: Класс для создания и управления потоками выполнения. Позволяет запускать функции в новом потоке.

  2. Lock, Event, Condition, Semaphore: Классы для синхронизации потоков и предотвращения состязаний (race conditions) при доступе к общим ресурсам.

  3. Timer: Класс для выполнения функции через определенное время.

  4. Barrier: Класс для организации точек синхронизации, где потоки могут остановиться и дождаться друг друга.

  5. local: Класс для хранения данных в потоке-локальном хранилище, доступном только в рамках данного потока.

  6. enumerate(): Функция для получения списка всех активных потоков в программе.

  7. current_thread(): Функция для получения объекта текущего исполняющегося потока.

Использование модуля threading позволяет создавать и управлять потоками выполнения в Python, что полезно при реализации параллельных задач и улучшении обработки задач, которые могут быть выполнены независимо друг от друга. При работе с потоками важно учитывать потенциальные проблемы многопоточности, такие как состязания при доступе к общим ресурсам или возможные блокировки, и применять синхронизацию для избежания этих проблем.

Теперь для решения той же задачи используем модуль threading и создадим потоки для вызова функции square параллельно.

Вот переписанный вариант функции с использованием модуля threading:

import threading

# Функция, которая будет применена к каждому элементу
def square(n):
    return n * n

if __name__ == '__main__':
    # Входной список значений
    values = [1, 2, 3, 4, 5]
    
    # Список для хранения результатов
    results = []
    
    # Создание и запуск потоков для вызова функции square
    threads = []
    for value in values:
        thread = threading.Thread(target=lambda x: results.append(square(x)), args=(value,))
        thread.start()
        threads.append(thread)
    
    # Ожидание завершения всех потоков
    for thread in threads:
        thread.join()
    
    print(results)

Этот код создает потоки для вызова функции square параллельно для каждого элемента входного списка values. Каждый поток выполняет функцию square для своего элемента и добавляет результат в список results. После запуска всех потоков, программa ожидает окончания выполнения каждого потока с помощью метода join(). В итоге, результаты будут выведены на экран.

В каких случаях нужно использовать threading, а в каких multiprocessing?

Использование модуля threading и multiprocessing в Python зависит от конкретной задачи и требований приложения. Вот несколько общих рекомендаций о том, в каких случаях лучше использовать каждый из этих подходов:

Использование модуля threading:

  1. Потоки подходят для задач, которые блокируются часто (например, ввод-вывод операции), так как потоки разделяют один процесс и общую память.

  2. Потоки имеют более низкий порог создания и уничтожения, поэтому их удобно использовать для коротких и быстрых операций.

  3. Если задача требует обмена данными между потоками без каких-либо проблем с синхронизацией процессов

Использование модуля multiprocessing:

  1. Мультипроцессинг особенно полезен в тех случаях, когда у вас есть задачи, которые критически нагружают процессор и не блокируются.

  2. Если вам нужно обрабатывать большой объем данных параллельно и каждая задача требует отдельного процесса.

  3. Когда программа работает на многопроцессорной системе и вы хотите использовать все ядра процессора на максимум.

В целом, при выборе между threading и multiprocessing стоит учитывать, что потоки используют общую память и могут работать быстрее, но требуют больше внимания к синхронизации и работают в рамках одного процесса. Мультипроцессинг, в свою очередь, работает в разных процессах и предпочтительнее при выполнении вычислительно интенсивных задач или обработке больших объемов данных.

Хороший пример

Пример выше так себе пример, честно говоря. И для демонстрации преимущества обоих модулей перед последовательным выполнением программы я покажу другой. Задача: скачать информацию о 20 персонажах «Звездных войн» из онлайн базы и записать в базу данных SQLite3.

Для этого я напишу функцию, которая будет получать json-объект с характеристиками персонажа и вызывать функцию для записи информации в нашу базу. И, соответственно, функцию, которая записывает полученную информацию в нашу базу. А так же еще три функции с последовательным выполнением первой функции, с использованием пула потоков и пула процессов.

Первая функция, которая получает данные, принимает лишь один аргумент- целое число. Так как к используемому API мы всегда делаем одинаковый запрос, меняя только ID персонажа: ttps://swapi.dev/api/peple/целое_число/.

Кроме того, в написанном мной коде используется модули logging и time для замера времени выполнения каждой функции и вывода этой информации. Вы можете использовать обычные принты, если хотите. И еще в данном случае я не использую модуль threading для работы с потоками, вместо него ThreadPool из multiprocessing.pool.

Пишем импорты:

import time
import sqlite3
import logging
import requests
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool

Настраиваем логирование:

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Функция получения данных:

def get_peoples_data(number: int) -> None:
    url = f"https://swapi.dev/api/people/{number}/"
    response = requests.get(url)
    data = response.json()
    database_manage(data)

Функция записи в базу данных:

def database_manage(data: dict) -> None:
    conn = sqlite3.connect('starwarscharacters.db')
    cursor = conn.cursor()

    cursor.execute('''SELECT name FROM sqlite_master WHERE type='table' AND name='characters' ''')
    table_exists = cursor.fetchone()

    if not table_exists:
        cursor.execute('''CREATE TABLE characters
                            (name text, 
                            height integer, 
                            mass integer, 
                            haircolor text, 
                            skincolor text, 
                            eyecolor text, 
                            birthyear text, 
                            gender text)''')

    try:
        cursor.execute('''INSERT INTO characters (name, height, mass, haircolor, skincolor, eyecolor, birthyear, gender)
        VALUES (?,?,?,?,?,?,?,?)''',
                       (data['name'], data['height'], data['mass'], data['hair_color'], data['skin_color'],
                        data['eye_color'], data['birth_year'], data['gender']))

    except:
        pass
    finally:
        conn.commit()
        conn.close()

Функция последовательного выполнения с замером времени и выводом лога:

def sequential_approach():
    start = time.time()
    input_value = [i for i in range(1, 21)]

    for inp in input_value:
        get_peoples_data(inp)
    end = time.time()
    logger.info(f'Time taken in seconds for sequential - {end - start}')

Функция с пулом процессов:

def high_load_map():
    start = time.time()
    input_value = [i for i in range(1, 21)]

    with Pool(processes=cpu_count()) as pool:
        pool.map(get_peoples_data, input_value)

    end = time.time()
    logger.info(f'Time taken in seconds - {end - start}')

Функция с пулом потоков:

def execution_with_threadpool():
    pool = ThreadPool(processes=cpu_count() * 5)
    input_value = [i for i in range(1, 21)]

    start = time.time()
    pool.map(get_peoples_data, input_value)
    pool.close()
    pool.join()
    end = time.time()
    logger.info(f'Time taken in seconds with threadpool - {end - start}')

Пул потоков тут создается с помощью ThreadPool(processes=cpu_count() * 5), где число потоков задается как умножение количества доступных ядер ЦП на 5. То есть если у вас есть 4 ядра CPU, то будет создан пул из 20 потоков, что должно обеспечить максимальную скорость выполнения некоторых задач. Однако, забегая вперед, скажу, что эта функция оказалась медленнее функции с пулом процессов.

Ну и стандартная финалочка с вызовом функций:

if __name__ == '__main__':
    #sequential_approach()
    high_load_map()
    #execution_with_threadpool()

Так как каждая из этих функций по итогу выполнения приводит к записи в одну базу данных, я вызывал их поочередно, комментируя остальные.

Вот так выглядит код целиком.
import time
import sqlite3
import logging
import requests
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_peoples_data(number: int) -> None:
    url = f"https://swapi.dev/api/people/{number}/"
    response = requests.get(url)
    data = response.json()
    database_manage(data)

def database_manage(data: dict) -> None:
    conn = sqlite3.connect('starwarscharacters.db')
    cursor = conn.cursor()

    cursor.execute('''SELECT name FROM sqlite_master WHERE type='table' AND name='characters' ''')
    table_exists = cursor.fetchone()

    if not table_exists:
        cursor.execute('''CREATE TABLE characters
                            (name text, 
                            height integer, 
                            mass integer, 
                            haircolor text, 
                            skincolor text, 
                            eyecolor text, 
                            birthyear text, 
                            gender text)''')

    try:
        cursor.execute('''INSERT INTO characters (name, height, mass, haircolor, skincolor, eyecolor, birthyear, gender)
        VALUES (?,?,?,?,?,?,?,?)''',
                       (data['name'], data['height'], data['mass'], data['hair_color'], data['skin_color'],
                        data['eye_color'], data['birth_year'], data['gender']))

    except:
        pass
    finally:
        conn.commit()
        conn.close()

def sequential_approach():
    start = time.time()
    input_value = [i for i in range(1, 21)]

    for inp in input_value:
        get_peoples_data(inp)
    end = time.time()
    logger.info(f'Time taken in seconds for sequential - {end - start}')

def high_load_map():
    start = time.time()
    input_value = [i for i in range(1, 21)]

    with Pool(processes=cpu_count()) as pool:
        pool.map(get_peoples_data, input_value)

    end = time.time()
    logger.info(f'Time taken in seconds - {end - start}')

def execution_with_threadpool():
    pool = ThreadPool(processes=cpu_count() * 5)
    input_value = [i for i in range(1, 21)]

    start = time.time()
    pool.map(get_peoples_data, input_value)
    pool.close()
    pool.join()
    end = time.time()
    logger.info(f'Time taken in seconds with threadpool - {end - start}')


if __name__ == '__main__':
    #sequential_approach()
    high_load_map()
    #execution_with_threadpool()

Не буду приводить наглядные результаты исследования скорости всех функций, какая из них оказалась самой шустрой вы уже и так поняли. А вот почему, я ответить не могу, так как сам всего лишь изучаю этот вопрос. По этой же причине буду рад любым комментариям от ветеранов потоков и процессов. Спасибо за внимание.

Комментарии (2)


  1. ggmaster
    03.04.2024 13:15

    Не встретил ни слова про GIL, что является, на мой взгляд, довольно важной темой при рассмотрении способов распараллеливания кода на Python.


    1. temabed Автор
      03.04.2024 13:15

      Спасибо за комментарий. Намеренно оставил теоретическую часть о потоках, процессах и GIL в Python за скобками, о чем предупредил в начале статьи. Это всего лишь статья-шпаргалка по двум модулям, чтобы вспомнить основные методы, в случае необходимости. А писать теорию про потоки, процессы и GIL я еще не дорос.