Данная статья написана новичком для новичков, т.е. для тех, кто только начинает изучать возможности многопроцессорного и многопоточного программирования в Python. Статья намеренно пишется без воды и со скомканной теорией, в стиле шпаргалки.
multiprocessing
Модуль multiprocessing в Python предоставляет возможности для работы с многопроцессорным программированием, позволяя создавать и управлять процессами, обмениваться данными между процессами, использовать пулы процессов и другие механизмы для параллельного выполнения задач.
Некоторые ключевые функции и классы модуля multiprocessing:
Process: Класс для создания и управления процессами. Позволяет запускать функции в новом процессе.
Pool: Класс, предоставляющий пул процессов для выполнения задач параллельно. Позволяет управлять пулом процессов и распределять задачи между процессами.
Queue: Класс для обмена данными между процессами через очередь, обеспечивая безопасность при работе с разделяемыми данными.
Manager: Класс для управления разделяемыми объектами между процессами. Позволяет создавать разделяемые списки, словари, очереди и другие.
Lock, Event, Condition, Semaphore: Классы для синхронизации процессов и предотвращения состязаний (race conditions) при доступе к общим ресурсам.
Pipe: Механизм для обмена данными между двумя процессами через двусторонний канал.
cpu_count(): Функция для возвращения количества доступных в системе процессоров (ядер).
Использование модуля multiprocessing позволяет эффективно использовать ресурсы многопроцессорной системы, ускорить выполнение задач и реализовать параллельное выполнение вычислений. Однако следует учитывать особенности многопроцессорного программирования, такие как управление разделяемыми данными, синхронизация процессов и предотвращение возникновения конфликтов при доступе к общим ресурсам.
Так же сразу необходимо вспомнить о популярной функции map(), в multiprocessing.Pool, которая используется для распределения задач на выполнение между процессами в пуле. Она позволяет применить указанную функцию к каждому элементу входного списка последовательно или параллельно в нескольких процессах. Затем map() ожидает завершения всех задач и возвращает список результатов в том же порядке, в котором были переданы входные данные.
Пример использования map() с процессами из пула:
from multiprocessing import Pool
# Функция, которая будет применена к каждому элементу
def square(n):
return n * n
if __name__ == '__main__':
# Создание пула из 4 процессов
with Pool(processes=4) as pool:
# Входной список значений
values = [1, 2, 3, 4, 5]
# Применение функции square к каждому элементу в списке параллельно
results = pool.map(square, values)
print(results)
В приведенном примере функция square() применяется к каждому элементу входного списка values параллельно с использованием четырех процессов в пуле. После завершения выполнения всех задач метод map() возвращает список с результатами применения функции к каждому элементу входного списка.
map() удобно использовать для обработки данных в пуле процессов, когда требуется распределить задачи на выполнение параллельно и получить результаты после завершения всех задач.
Если же мы не знаем количество доступных ядер в нашей системе, то код можно написать с использованием упомянутой выше функции cpu_count():
from multiprocessing import Pool, cpu_count
# Функция, которая будет применена к каждому элементу
def square(n):
return n * n
if __name__ == '__main__':
# Создание пула из 4 процессов
with Pool(processes=cpu_count()) as pool:
# Входной список значений
values = [1, 2, 3, 4, 5]
# Применение функции square к каждому элементу в списке параллельно
results = pool.map(square, values)
print(results)
threading
Модуль threading в Python предоставляет инструменты для работы с потоками выполнения (threads) в многопоточном программировании. Потоки позволяют выполнять несколько задач в одном процессе параллельно, что помогает улучшить отзывчивость программы и увеличить общую производительность.
Некоторые ключевые функции и классы модуля threading:
Thread: Класс для создания и управления потоками выполнения. Позволяет запускать функции в новом потоке.
Lock, Event, Condition, Semaphore: Классы для синхронизации потоков и предотвращения состязаний (race conditions) при доступе к общим ресурсам.
Timer: Класс для выполнения функции через определенное время.
Barrier: Класс для организации точек синхронизации, где потоки могут остановиться и дождаться друг друга.
local: Класс для хранения данных в потоке-локальном хранилище, доступном только в рамках данного потока.
enumerate(): Функция для получения списка всех активных потоков в программе.
current_thread(): Функция для получения объекта текущего исполняющегося потока.
Использование модуля threading позволяет создавать и управлять потоками выполнения в Python, что полезно при реализации параллельных задач и улучшении обработки задач, которые могут быть выполнены независимо друг от друга. При работе с потоками важно учитывать потенциальные проблемы многопоточности, такие как состязания при доступе к общим ресурсам или возможные блокировки, и применять синхронизацию для избежания этих проблем.
Теперь для решения той же задачи используем модуль threading и создадим потоки для вызова функции square параллельно.
Вот переписанный вариант функции с использованием модуля threading:
import threading
# Функция, которая будет применена к каждому элементу
def square(n):
return n * n
if __name__ == '__main__':
# Входной список значений
values = [1, 2, 3, 4, 5]
# Список для хранения результатов
results = []
# Создание и запуск потоков для вызова функции square
threads = []
for value in values:
thread = threading.Thread(target=lambda x: results.append(square(x)), args=(value,))
thread.start()
threads.append(thread)
# Ожидание завершения всех потоков
for thread in threads:
thread.join()
print(results)
Этот код создает потоки для вызова функции square параллельно для каждого элемента входного списка values. Каждый поток выполняет функцию square для своего элемента и добавляет результат в список results. После запуска всех потоков, программa ожидает окончания выполнения каждого потока с помощью метода join(). В итоге, результаты будут выведены на экран.
В каких случаях нужно использовать threading, а в каких multiprocessing?
Использование модуля threading и multiprocessing в Python зависит от конкретной задачи и требований приложения. Вот несколько общих рекомендаций о том, в каких случаях лучше использовать каждый из этих подходов:
Использование модуля threading:
Потоки подходят для задач, которые блокируются часто (например, ввод-вывод операции), так как потоки разделяют один процесс и общую память.
Потоки имеют более низкий порог создания и уничтожения, поэтому их удобно использовать для коротких и быстрых операций.
Если задача требует обмена данными между потоками без каких-либо проблем с синхронизацией процессов
Использование модуля multiprocessing:
Мультипроцессинг особенно полезен в тех случаях, когда у вас есть задачи, которые критически нагружают процессор и не блокируются.
Если вам нужно обрабатывать большой объем данных параллельно и каждая задача требует отдельного процесса.
Когда программа работает на многопроцессорной системе и вы хотите использовать все ядра процессора на максимум.
В целом, при выборе между threading и multiprocessing стоит учитывать, что потоки используют общую память и могут работать быстрее, но требуют больше внимания к синхронизации и работают в рамках одного процесса. Мультипроцессинг, в свою очередь, работает в разных процессах и предпочтительнее при выполнении вычислительно интенсивных задач или обработке больших объемов данных.
Хороший пример
Пример выше так себе пример, честно говоря. И для демонстрации преимущества обоих модулей перед последовательным выполнением программы я покажу другой. Задача: скачать информацию о 20 персонажах «Звездных войн» из онлайн базы и записать в базу данных SQLite3.
Для этого я напишу функцию, которая будет получать json-объект с характеристиками персонажа и вызывать функцию для записи информации в нашу базу. И, соответственно, функцию, которая записывает полученную информацию в нашу базу. А так же еще три функции с последовательным выполнением первой функции, с использованием пула потоков и пула процессов.
Первая функция, которая получает данные, принимает лишь один аргумент- целое число. Так как к используемому API мы всегда делаем одинаковый запрос, меняя только ID персонажа: ttps://swapi.dev/api/peple/целое_число/.
Кроме того, в написанном мной коде используется модули logging и time для замера времени выполнения каждой функции и вывода этой информации. Вы можете использовать обычные принты, если хотите. И еще в данном случае я не использую модуль threading для работы с потоками, вместо него ThreadPool из multiprocessing.pool.
Пишем импорты:
import time
import sqlite3
import logging
import requests
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool
Настраиваем логирование:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Функция получения данных:
def get_peoples_data(number: int) -> None:
url = f"https://swapi.dev/api/people/{number}/"
response = requests.get(url)
data = response.json()
database_manage(data)
Функция записи в базу данных:
def database_manage(data: dict) -> None:
conn = sqlite3.connect('starwarscharacters.db')
cursor = conn.cursor()
cursor.execute('''SELECT name FROM sqlite_master WHERE type='table' AND name='characters' ''')
table_exists = cursor.fetchone()
if not table_exists:
cursor.execute('''CREATE TABLE characters
(name text,
height integer,
mass integer,
haircolor text,
skincolor text,
eyecolor text,
birthyear text,
gender text)''')
try:
cursor.execute('''INSERT INTO characters (name, height, mass, haircolor, skincolor, eyecolor, birthyear, gender)
VALUES (?,?,?,?,?,?,?,?)''',
(data['name'], data['height'], data['mass'], data['hair_color'], data['skin_color'],
data['eye_color'], data['birth_year'], data['gender']))
except:
pass
finally:
conn.commit()
conn.close()
Функция последовательного выполнения с замером времени и выводом лога:
def sequential_approach():
start = time.time()
input_value = [i for i in range(1, 21)]
for inp in input_value:
get_peoples_data(inp)
end = time.time()
logger.info(f'Time taken in seconds for sequential - {end - start}')
Функция с пулом процессов:
def high_load_map():
start = time.time()
input_value = [i for i in range(1, 21)]
with Pool(processes=cpu_count()) as pool:
pool.map(get_peoples_data, input_value)
end = time.time()
logger.info(f'Time taken in seconds - {end - start}')
Функция с пулом потоков:
def execution_with_threadpool():
pool = ThreadPool(processes=cpu_count() * 5)
input_value = [i for i in range(1, 21)]
start = time.time()
pool.map(get_peoples_data, input_value)
pool.close()
pool.join()
end = time.time()
logger.info(f'Time taken in seconds with threadpool - {end - start}')
Пул потоков тут создается с помощью ThreadPool(processes=cpu_count() * 5), где число потоков задается как умножение количества доступных ядер ЦП на 5. То есть если у вас есть 4 ядра CPU, то будет создан пул из 20 потоков, что должно обеспечить максимальную скорость выполнения некоторых задач. Однако, забегая вперед, скажу, что эта функция оказалась медленнее функции с пулом процессов.
Ну и стандартная финалочка с вызовом функций:
if __name__ == '__main__':
#sequential_approach()
high_load_map()
#execution_with_threadpool()
Так как каждая из этих функций по итогу выполнения приводит к записи в одну базу данных, я вызывал их поочередно, комментируя остальные.
Вот так выглядит код целиком.
import time
import sqlite3
import logging
import requests
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_peoples_data(number: int) -> None:
url = f"https://swapi.dev/api/people/{number}/"
response = requests.get(url)
data = response.json()
database_manage(data)
def database_manage(data: dict) -> None:
conn = sqlite3.connect('starwarscharacters.db')
cursor = conn.cursor()
cursor.execute('''SELECT name FROM sqlite_master WHERE type='table' AND name='characters' ''')
table_exists = cursor.fetchone()
if not table_exists:
cursor.execute('''CREATE TABLE characters
(name text,
height integer,
mass integer,
haircolor text,
skincolor text,
eyecolor text,
birthyear text,
gender text)''')
try:
cursor.execute('''INSERT INTO characters (name, height, mass, haircolor, skincolor, eyecolor, birthyear, gender)
VALUES (?,?,?,?,?,?,?,?)''',
(data['name'], data['height'], data['mass'], data['hair_color'], data['skin_color'],
data['eye_color'], data['birth_year'], data['gender']))
except:
pass
finally:
conn.commit()
conn.close()
def sequential_approach():
start = time.time()
input_value = [i for i in range(1, 21)]
for inp in input_value:
get_peoples_data(inp)
end = time.time()
logger.info(f'Time taken in seconds for sequential - {end - start}')
def high_load_map():
start = time.time()
input_value = [i for i in range(1, 21)]
with Pool(processes=cpu_count()) as pool:
pool.map(get_peoples_data, input_value)
end = time.time()
logger.info(f'Time taken in seconds - {end - start}')
def execution_with_threadpool():
pool = ThreadPool(processes=cpu_count() * 5)
input_value = [i for i in range(1, 21)]
start = time.time()
pool.map(get_peoples_data, input_value)
pool.close()
pool.join()
end = time.time()
logger.info(f'Time taken in seconds with threadpool - {end - start}')
if __name__ == '__main__':
#sequential_approach()
high_load_map()
#execution_with_threadpool()
Не буду приводить наглядные результаты исследования скорости всех функций, какая из них оказалась самой шустрой вы уже и так поняли. А вот почему, я ответить не могу, так как сам всего лишь изучаю этот вопрос. По этой же причине буду рад любым комментариям от ветеранов потоков и процессов. Спасибо за внимание.
ggmaster
Не встретил ни слова про GIL, что является, на мой взгляд, довольно важной темой при рассмотрении способов распараллеливания кода на Python.
temabed Автор
Спасибо за комментарий. Намеренно оставил теоретическую часть о потоках, процессах и GIL в Python за скобками, о чем предупредил в начале статьи. Это всего лишь статья-шпаргалка по двум модулям, чтобы вспомнить основные методы, в случае необходимости. А писать теорию про потоки, процессы и GIL я еще не дорос.