Я довольно давно пишу на Python и во многих проектах использовал multiprocessing — пакет стандартной библиотеки языка Python, который предоставляет интерфейс для работы с процессами, очередями, пулами процессов и многими другими удобными инструментами для параллельного программирования. В какой-то момент я понял, что мне не хватает более детального понимания работы этой библиотеки.
Мне захотелось залезть в исходники multiprocessing, разобраться и заодно написать статью. Данная статья в основном рассчитана на новичков в Python и тех, кто хочет подробнее разобраться в том, как именно создаются процессы и пулы в Python и погрузиться в детали реализации.
В статье я не буду рассказывать что такое процессы и зачем они нужны. Почитать самую базу про операционные системы и процессы можно, например, тут и тут. Также важно уточнить, что весь приведенный в статье код соответствует версии Python 3.11.4
Содержание
Создание нового процесса в ОС
Способы создания нового процесса в multiprocessing
Контекст (Context)
-
Процесс (Process)
Создание процесса через fork
Создание процесса через spawn
Завершение процесса
Пул процессов (ProcessPool)
Создание нового процесса в ОС
Процессы в ОС создаются с помощью системных вызовов - низкоуровневых функций операционной системы, позволяющих пользовательским программам взаимодействовать с ОС.
В UNIX-подобных системах для создания нового процесса используется системный вызов fork. На самом деле существует целое семейство системных вызовов: fork, vfork, clone. Их суть очень похожа и в данной статье нам достаточно рассмотреть только fork.
Системный вызов fork создает копию текущего процесса, возвращает ноль в дочернем процессе и PID ребенка в родительском процессе. Важно сказать, что фактического копирования и выделения памяти при создании процесса не происходит. Вместо этого используется технология copy-on-write, которая создаёт копию страницы памяти только при попытке записи в эту страницу. Это позволяет уменьшить количество потребляемой процессами памяти и значительно ускорить создание процессов.
В Windows для создания процессов используется системный вызов CreateProcess из WinApi. В отличие от UNIX-подобных систем, в Windows в созданный процесс сразу загружается программа, переданная в аргументы системного вызова.
Способы создания нового процесса в multiprocessing
В пакете multiprocessing есть 3 основных метода для создания нового процесса: fork, spawn и forkserver.
fork:
Как понятно из названия, использует системный вызов fork для создания нового процесса
В multiprocessing является способом создания процессов по умолчанию на POSIX системах кроме macOS
Не поддерживается на windows
spawn:
Запускает новый процесс, используя переданную в него команду. В нашем случае будет передаваться команда для запуска процесса с интепретатором Python. В аргументы интепретатору передается путь к запускаемому файлу, а также некоторые другие служебные аргументы
В multiprocessing является способом создания процессов по умолчанию на macOS и windows
Обычно работает медленнее, чем fork
forkserver:
Создаётся серверный процесс, который создаёт процессы методом fork. Когда требуется новый процесс, родительский процесс соединяется с сервером и запрашивает у него форк нового процесса. Этот метод сочетает в себе скорость работы fork с хорошей надежностью (так как серверный процесс, от которого создается дочерний, имеет простое состояние)
Не поддерживается на windows
В данной статье я разберу только первые два метода, так как они являются самыми часто используемыми.
Контекст (Context)
Первое, с чем предстоит разобраться при погружении в реализацию multiprocessing - это класс контекста. Контекст имеет такое же API, что и сам модуль multiprocessing, однако он позволяет установить и зафиксировать способ создания новых процессов: fork, spawn или forkserver.
import multiprocessing as mp
if __name__ == '__main__':
ctx_spawn = mp.get_context('spawn')
ctx_fork = mp.get_context('fork')
Контекст позволяет использовать разные методы создания процесса внутри одной программы. В примере выше все процессы, созданные через объект контекста ctx_spawn
, будут созданы методом spawn
, а через ctx_fork
, соответственно, методом fork
.
Под каждый из методов создания процессов реализован класс контекста, наследованный от BaseContext
. Эти классы отличаются только классом процесса, который используется для порождения дочерних процессов:
class ForkContext(BaseContext):
_name = 'fork'
Process = ForkProcess # Все созданные процессы будут иметь тип ForkProcess
class SpawnContext(BaseContext):
_name = 'spawn'
Process = SpawnProcess # Все созданные процессы будут иметь тип SpawnProcess
BaseContext
, в свою очередь, просто реализует API как у пакета multiprocessing. Исходный код можно посмотреть тут.
Для создания процессов можно и не использовать контекст, а напрямую создавать multiprocessing.Process
. В этом случае просто используется контекст по умолчанию, который лежит в глобальной переменной _default_context
. Дефолтный контекст определяется в зависимости от типа вашей ОС. Для windows и macOS - это spawn, а для остальных - fork. Это можно наглядно увидеть в исходном коде:
if sys.platform != 'win32':
# тут объявляются классы контекстов и процессов для POSIX систем
...
_concrete_contexts = {
'fork': ForkContext(),
'spawn': SpawnContext(),
'forkserver': ForkServerContext(),
}
if sys.platform == 'darwin':
# на macOS используется spawn по умолчанию
_default_context = DefaultContext(_concrete_contexts['spawn'])
else:
# на всех остальных POSIX системах используется fork
_default_context = DefaultContext(_concrete_contexts['fork'])
else:
# тут объявляются класс контекста и процесса для windows
...
_concrete_contexts = {
'spawn': SpawnContext(),
}
# на windows используется spawn по умолчанию
_default_context = DefaultContext(_concrete_contexts['spawn'])
Внимательные читатели заметили класс DefaultContext
- этот класс контекста просто использует дефолтный для текущей ОС метод создания процессов.
Процесс (Process)
Рассмотрим самый простой пример создания процесса с помощью модуля multiprocessing и попробуем разобраться, что происходит под капотом.
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
В этом примере класс Process
принимает параметры target - функция, которую нужно запустить в дочернем процессе, и args - параметры для этой функции. Давайте посмотрим на объявление класса Process
:
class Process(process.BaseProcess):
_start_method = None
@staticmethod
def _Popen(process_obj):
return _default_context.get_context().Process._Popen(process_obj)
@staticmethod
def _after_fork():
return _default_context.get_context().Process._after_fork()
Process
является классом процесса, используемым по умолчанию, он наследуется от BaseProcess
- базового для всех процессов. Еще есть классы ForkProcess
, SpawnProcess
, ForkServerProcess
- реализации под определённый метод создания процесса. Все классы реализуют статический метод _Popen
для создания нового процесса и _after_fork
для чистки после создания дочернего процесса.
Можно заметить, что в Process
метод создания процесса не установлен. Это потому, что он берется из контекста по умолчанию. _default_context.get_context()
возвращает объект типа ForkContext
, SpawnContext
или ForkServerContext
. У контекста есть атрибут Process
, у которого уже вызываются методы _Popen
и _after_fork
, имеющие конкретную реализацию под тип запуска.
Вернемся к нашему примеру. При вызове p.start()
внутри класса процесса вызывается статический метод _Popen()
. В нём инициализируется класс Popen
(см. код ниже), который отвечает за создание процесса и взаимодействие с ним.
class ForkProcess(process.BaseProcess):
_start_method = 'fork'
@staticmethod
def _Popen(process_obj):
from .popen_fork import Popen
return Popen(process_obj)
class SpawnProcess(process.BaseProcess):
_start_method = 'spawn'
@staticmethod
def _Popen(process_obj):
from .popen_spawn_posix import Popen
return Popen(process_obj)
@staticmethod
def _after_fork():
# process is spawned, nothing to do
pass
Создание процесса через fork
Сначала рассмотрим как создаются процессы методом fork. Как мы уже знаем, при старте процесса инициализируется класс Popen
, который имеет реализацию под каждый из способов создания процессов. Для fork реализацию можно найти в файле popen_fork.py.
class Popen(object):
method = 'fork'
def __init__(self, process_obj):
util._flush_std_streams()
self.returncode = None
self.finalizer = None
# при инициализации вызывается метод _launch
self._launch(process_obj)
# другие методы
# ...
def _launch(self, process_obj):
code = 1
# создаем две пары дескриптором
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
self.pid = os.fork() # создаем новый процесс
if self.pid == 0:
# в эту ветвь заходит в дочернем процессе
try:
os.close(parent_r)
os.close(parent_w)
code = process_obj._bootstrap(parent_sentinel=child_r)
finally:
os._exit(code)
else:
# в эту ветвь заходит в родительском процессе
os.close(child_w)
os.close(child_r)
self.finalizer = util.Finalize(self, util.close_fds,
(parent_r, parent_w,))
self.sentinel = parent_r
process_obj
тут - это объект процесса multiprocessing.Process
.
Для создания процесса используется функция os.fork()
, которая использует системный вызов fork. os.fork()
, как и системный вызов fork
, возвращает ноль в дочернем процессе и PID ребенка в родительском процессе. fork полностью копирует родительский процесс, поэтому выполнение программы продолжается с той же инструкции.
В дочернем процессе вызывается метод _bootstrap
объекта процесса, внутри которого как раз и вызывается нужная пользовательская функция.
Теперь мы посмотрели весь путь от p.start()
до запуска таргетной функции. Давайте резюмируем. Внутри p.start()
берется дефолтный контекст и создаётся класс Popen
, имеющий реализацию под каждый из методов создания новых процессов. В случае с fork для создания нового процесса используется функция os.fork()
, создающая копию родительского процесса. В дочернем процессе вызывается таргетная функция, а родительский процесс чистит ненужные объекты и завершает выполнение метода p.start()
.
Создание процесса через spawn
Как запускается процесс при использовании SpawnProcess
? В пакете multiprocessing есть реализации метода spawn как под windows, так и под POSIX системы. Смысл их работы один и тот же, просто используются разные интерфейсы для работы с ОС. Поэтому для простоты рассмотрим реализацию под POSIX системы. Весь код процесса почти совпадает с реализацией для fork, основное отличие в классе Popen, используемом для создания новых процессов:
class Popen(popen_fork.Popen):
method = 'spawn'
DupFd = _DupFd
def __init__(self, process_obj):
self._fds = []
super().__init__(process_obj)
# другие методы
# ...
def _launch(self, process_obj):
from . import resource_tracker
tracker_fd = resource_tracker.getfd()
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
set_spawning_popen(self)
try:
reduction.dump(prep_data, fp) # сериализация словаря с информацией о процессе
reduction.dump(process_obj, fp) # сериализация объекта процесса
finally:
set_spawning_popen(None)
parent_r = child_w = child_r = parent_w = None
try:
parent_r, child_w = os.pipe() # создаем 4 дескриптора для обмена данными
child_r, parent_w = os.pipe()
cmd = spawn.get_command_line(tracker_fd=tracker_fd,
pipe_handle=child_r)
self._fds.extend([child_r, child_w])
self.pid = util.spawnv_passfds(spawn.get_executable(),
cmd, self._fds)
self.sentinel = parent_r
with open(parent_w, 'wb', closefd=False) as f:
f.write(fp.getbuffer())
finally:
fds_to_close = []
for fd in (parent_r, parent_w):
if fd is not None:
fds_to_close.append(fd)
self.finalizer = util.Finalize(self, util.close_fds, fds_to_close)
for fd in (child_r, child_w):
if fd is not None:
os.close(fd)
process_obj
тут также является объектом процесса multiprocessing.Process
.
Разберем основные части:
prep_data = spawn.get_preparation_data(process_obj._name)
Сначала собирается информация о родительском процессе, которая потребуется дочернему процессу для десериализации объекта родителя. Эта информация включает в себя путь к интерпретируемому файлу, аргументы запуска, директорию, откуда запускался файл, и др.
reduction.dump(prep_data, fp)
reduction.dump(process_obj, fp)
Затем сериализуется словарь с информацией о родительском процессе и сам объект процесса. reduction.dump
внутри вызывает стандартный pickle.dump
. Pickle - модуль языка Python, позволяющий преобразовывать объекты языка в поток байтов (сериализовывать) и, соответственно, десериализовывать. Этот модуль используется в пакете multiprocessing для передачи Python-объектов между процессами.
cmd = spawn.get_command_line(tracker_fd=tracker_fd,
pipe_handle=child_r)
self._fds.extend([child_r, child_w])
self.pid = util.spawnv_passfds(spawn.get_executable(),
cmd, self._fds)
Дальше создается команда для запуска интепретатора, в которую передаются все нужные аргументы. Затем util.spawnv_passfds
запускает эту команду в новом процессе. Также туда передаются файловые дескрипторы, которые должны остаться открытыми в новом процессе.
with open(parent_w, 'wb', closefd=False) as f:
f.write(fp.getbuffer())
Затем в дочерний процесс передаётся сериализованная информация о процессе. В дочернем процессе десериализуется переданный объект процесса и запускается таргетная функция. Profit!
Давайте углубимся и посмотрим как именно создается процесс в функции util.spawnv_passfds
:
# Start a program with only specified fds kept open
def spawnv_passfds(path, args, passfds):
import _posixsubprocess
import subprocess
passfds = tuple(sorted(map(int, passfds)))
errpipe_read, errpipe_write = os.pipe()
try:
return _posixsubprocess.fork_exec(
args, [path], True, passfds, None, None,
-1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
False, False, -1, None, None, None, -1, None,
subprocess._USE_VFORK)
finally:
os.close(errpipe_read)
os.close(errpipe_write)
Для создания нового процесса используется метод fork-exec. Создание нового процесса выполняется двумя системными вызовами. Сначала fork создает дочерний процесс, который копирует процесс родителя. Затем в дочернем процессе вызывается системный вызов exec (на самом деле системного вызова exec нет, под этим мы будем иметь в виду семейство из нескольких подобных системных вызовов, так как их суть в одном и том же). Exec запускает новый исполняемый файл в контексте уже существующего процесса, заменяя предыдущий исполняемый файл. Таким образом, исполняемый файл меняется внутри одного процесса.
Завершение процесса
Вернемся к изначальному примеру:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Теперь мы знаем, что происходит, когда мы создаем процесс и вызываем у него метод start()
. А что происходит при вызове join()
?
При вызове join()
родительский процесс дожидается завершения дочернего. Сначала происходит несколько проверок, а затем вызывается метод wait
у уже знакомого нам класса Popen
:
class Popen(object):
# другие методы
# ...
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
# если процесс еще не завершился, то ждем
pid, sts = os.waitpid(self.pid, flag)
except OSError:
# Child process not yet created. See #1731717
# e.errno == errno.ECHILD == 10 return None
if pid == self.pid:
self.returncode = os.waitstatus_to_exitcode(sts)
return self.returncode
def wait(self, timeout=None):
# проверяем завершился ли уже процесс
if self.returncode is None:
if timeout is not None:
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None
# This shouldn't block if wait() returned successfully.
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
return self.returncode
Там проверяется, завершился ли уже процесс - если да, то просто возвращается returncode
дочернего процесса. Если нет, то дожидается завершения дочернего процесса, используя функцию os.waitpid()
.
Пул процессов
Помимо процессов, пакет multiprocessing предоставляет множество полезных классов и функций для работы с процессами. Один из самых часто используемых инструментов - пул процессов. Пул (англ. Pool) позволяет распараллелить выполнение функции на множестве значений, используя для этого несколько процессов.
При создании объекта Pool
указывается количество рабочих процессов, которые будут выполнять задачи. Также ему можно передать контекст, который будет использован для запуска процессов. В рамках данной статьи нас интересуют три основных метода класса Pool
:
Pool.apply()
- вызывает функцию с аргументамиPool.apply_async()
- асинхронный вариантPool.apply()
. То естьapply_async()
не дожидается результата завершения работы функцииPool.map()
- многопроцессорный аналог встроенной функцииmap()
, которая применяет функцию к любой последовательности, поддерживающей итерирование, и возвращает список результатов работы этой функции.Pool.map_async()
- асинхронный вариантmap()
Пример использования пула процессов:
import time
from multiprocessing.pool import Pool
def wait_and_return(x):
time.sleep(1)
return x
if __name__ == "__main__":
with Pool(4) as pool:
result = pool.map(wait_and_return, [1,2,3,4])
print(result)
Программа выведет [1, 2, 3, 4]
. Но несмотря на то, что суммарное время ожидания должно равняться четырем секундам, программа отработает за одну секунду. Это связано с тем, что аргументы для функции распределяются по четырем рабочим процессам, которые выполняются параллельно.
Пул процессов имеет очередь заданий, в которую добавляются новые задания при вызове методов apply_async()
, map()
и др. Если упростить, то можно сказать, что из этой очереди рабочие процессы забирают задания. В качестве очереди используется multiprocessing.Queue
, которая позволяет безопасно передавать данные между процессами. После выполнения задания рабочий процесс складывает результат в общую для всех воркеров очередь результатов.
Разберем метод apply_async()
. Он принимает функцию с аргументами для нее и складывает в очередь заданий. Метод возвращает объект класса ApplyResult
. По своей сути данный класс имеет тип Future - инкапсулированный результат выполнения некоторой операции, которая еще не завершилась. При попытке получить результат (ApplyResult.get()
) процесс блокируется до момента, когда операция завершится и будет получен результат.
map()
принимает функцию и список аргументов, на которых необходимо запустить функцию. Метод также возвращает футуру MapResult
, результат которой можно получить через блокирующий метод MapResult.get()
. Задания разбиваются на чанки, которые отправляются в очередь с заданиями. Важно уточнить, что в очереди лежат не отдельные задания, а списки из заданий. Соответственно каждый рабочий процесс из очереди вытаскивает не одно отдельное задание, а список (чанк) заданий. Для apply_async()
размер чанка равен одному.
Давайте разберемся, как именно создаются рабочие процессы и распределяются задания. При инициализации пула создаётся три потока:
_worker_handler
- в этом потоке создаются процессы и мониторится количество текущих рабочих процессов. Когда какие-то из рабочих процессов завершаются, то этот поток создаёт новые, чтобы количество рабочих процессов всегда равнялось установленному при инициализации значению._task_handler
- в этом потоке обрабатываются задания из очереди, в которую кладутся переданные пользователем задания при вызовеapply_async()
,map()
и других методов. Затем задания из этой очереди передаются в другую очередь, которую уже обрабатывают рабочие процессы. Рабочие процессы складывают результаты в одну очередь с результатами работы._result_handler
- в этом потоке собираются результаты завершенных заданий из общей очереди и записываются в объект результатаApplyResult
(илиMapResult
).
Заключение
Я надеюсь, что данная статья помогла вам получить более глубокое понимание устройства процессов и использования пакета multiprocessing в Python. Помните, что понимание основ работы с процессами открывает новые возможности для оптимизации и ускорения выполнения ваших программ. Спасибо за внимание и удачи!
unreal_undead2
Спасибо! В принципе документация multiprocessing достаточна для использования, но интересно, как оно под капотом.
Все таки close/join тоже стоило упомянуть, без них async функциями пользоваться сложно )
boomb0om Автор
Про close/join изначально хотел написать, но в реализации особо ничего интересного там нет) Документации будет достаточно для понимания зачем нужны close и join