Привет, Хабр!

Вы запускаете многопроцессную задачу, кидаете данные в multiprocessing.Queue(), а потом вдруг замечаете... что всё тормозит. Муторно. Медленно. Местами прям отвратительно. Вы смотрите в монитор, на top, на htop, на код — и не понимаете: ну ведь должно же летать! А не летит.

Почему тормозит multiprocessing.Queue?

С виду — всё элементарно:

from multiprocessing import Process, Queue

q = Queue()

def worker(q):
    for i in range(100):
        q.put(i)

p = Process(target=worker, args=(q,))
p.start()

for _ in range(100):
    print(q.get())

p.join()

Работает. А теперь поменяйте range(100) на range(100_000) и CPU загружен, очередь захлёбывается, потребление памяти пляшет. Что пошло не так?

multiprocessing.Queue — это не просто список с блокировкой. Это обёртка над двунаправленным IPC-каналом, реализованным через multiprocessing.Pipe, который, в свою очередь, использует либо os.pipe (на Unix), либо CreateNamedPipe()/ConnectNamedPipe() на Windows. Всё это — системные вызовы, с явными пределами по скорости и буферизации.

Внутри Queue использует Connection — объект, построенный на socket-подобном интерфейсе для межпроцессной передачи. Т.е, вы по факту отсылаете байты между процессами. А значит:

  • Надо сериализовать (pickle.dumps)

  • Надо переслать (write в pipe)

  • Надо десериализовать (pickle.loads)

Где начинаются тормоза

Проблем несколько, и они компаундятся:

1. Каждый put(obj) превращает obj в байтовую строку через pickle.dumps(). Это уже не бесплатно. А если у вас внутри объект с вложенностями, ссылками, numpy-массивами или кастомными классами — сериализация становится линейно или даже экспоненциально дорогой. Пример:

import pickle
import numpy as np

x = np.random.rand(1000, 1000)
len(pickle.dumps(x))  # десятки мегабайт

Да, NumPy умеет быстро сериализоваться, но всё равно не очень. А теперь представьте, что вы таких массивов гоните сотни в секунду.

2. Стандартный os.pipe (или named pipe) — это механизм, у которого есть буфер (обычно 64 КБ на POSIX). Если producer пишет быстрее, чем consumer читает — всё стопорится. Блокировка. Или накопление в Queue._buffer, и это дополнительная прослойка.

3. Очередь не отправляет данные в Pipe напрямую из put(). Она складывает их в Queue._buffer, и отдельный поток (_feeder) отправляет их через Pipe в фоне. Это даёт возможность put() отрабатывать быстро… пока буфер не забит. Но в момент перегруза feeder не успевает — и put() начинает ждать.

multiprocessing.Queue — всегда копия, никогда zero-copy. И здесь то самое фундаментальное ограничение: очередь в multiprocessing всегда копирует данные. Не получиотся передать объект как есть. Даже numpy.ndarray, даже bytearray, всё будет сериализовано в байты, скопировано в pipe, считано, восстановлено.

Если важно не копировать данные, например, при передаче массивов, изображений, аудио — Queue просто не ваш инструмент.

Даже если вы передаёте числа или строки, проблема остаётся:

  • pickle на каждый объект

  • write() на каждый объект

  • read() на каждый объект

  • передача между потоками/процессами

  • фоновый поток feeder'а + буферная очередь

А если у вас ещё и несколько consumer-ов или producer-ов, то: появляются блокировки, contention на уровне feeder'а, утилизируются потоки, а не ядра.

SharedMemory: спасение от пайпов и pickle

shared_memory — это прямой доступ к общей памяти, без сериализации, без промежуточных pipe, без GIL-локов на каждом put().

Введён он был в Python 3.8, и дал то, о чём давно мечтали: возможность шарить байты между процессами на уровне ядра, без overhead.

Как это вообще работает

multiprocessing.shared_memory.SharedMemory это не просто Python-объект. Он создаёт POSIX shared memory сегмент (на Linux/Unix через shm_open(), на Windows — через CreateFileMapping()).

Базовая логика:

  • Один процесс создаёт сегмент общей памяти: SharedMemory(create=True)

  • Получает уникальное имя: например, 'psm_f4e245c2'

  • Передаёт это имя в другой процесс (через multiprocessing.Queue, файл, сокет, неважно)

  • Второй процесс подключается к той же памяти по имени: SharedMemory(name='psm_f4e245c2')

  • Всё. Они видят один и тот же блок байтов

Пример:

# процесс 1
from multiprocessing import shared_memory
import numpy as np

arr = np.arange(10_000, dtype=np.float64)
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
shm_array = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shm_array[:] = arr[:]

print(f"Shared memory created: {shm.name}")
# процесс 2
from multiprocessing import shared_memory
import numpy as np

existing = shared_memory.SharedMemory(name='psm_f4e245c2')
arr = np.ndarray((10_000,), dtype=np.float64, buffer=existing.buf)
print(arr[5000])  # работает

Без pickle, без copy, без pipe.

Производительная очередь на SharedMemory

Допустим, хочется реализовать очередь без сериализации, но с безопасным доступом. Тип данных — float64 для простоты. Алгоритм — кольцевой буфер.

Архитектура ShmQueue:

from multiprocessing import shared_memory, Value, Lock
import numpy as np

class ShmQueue:
    def __init__(self, size, dtype=np.float64):
        self.size = size
        self.dtype = dtype
        self.itemsize = np.dtype(dtype).itemsize
        self.shm = shared_memory.SharedMemory(create=True, size=self.size * self.itemsize)
        self.buffer = np.ndarray((size,), dtype=dtype, buffer=self.shm.buf)

        self.head = Value('i', 0)  # где читать
        self.tail = Value('i', 0)  # где писать
        self.lock = Lock()

    def put(self, item):
        with self.lock:
            next_tail = (self.tail.value + 1) % self.size
            if next_tail == self.head.value:
                raise BufferError("Очередь переполнена")
            self.buffer[self.tail.value] = item
            self.tail.value = next_tail

    def get(self):
        with self.lock:
            if self.head.value == self.tail.value:
                raise BufferError("Очередь пуста")
            item = self.buffer[self.head.value]
            self.head.value = (self.head.value + 1) % self.size
            return item

    def name(self):
        return self.shm.name

    def close(self):
        self.shm.close()

    def unlink(self):
        self.shm.unlink()

Value — это обёртка над ctypes, которая делает целочисленные счётчики head и tail доступными из других процессов. Без них вы не синхронизируете указатели чтения/записи.

Семафор Lock нужен для защиты от гонок между put() и get() при доступе к буферу. Если нужно больше производительности, можно уйти в lock-free реализацию (atomic CAS с ctypes или даже numba), но это уже другой лвл.

Передача больших данных: массивы, изображения, фреймы

Что делать, если вы не просто передаёте float64, а полноценный фрейм — например, RGB-изображение 1920x1080. Передавайте только метаинформацию, а сами данные кладите в отдельный сегмент SharedMemory.

Паттерн: мета + данные:

В один shm-сегмент кладёте байты данных (image.tobytes() или np.ndarray). Во втором (или в очереди) передаёте:

{
  "name": "psm_123456",
  "shape": [1080, 1920, 3],
  "dtype": "uint8"
}

Это можно передавать как JSON, либо как ctypes.Struct, либо как сериализованный заголовок. У вас полная свобода.

Пример: zero-copy producer -> worker

Представим пайплайн:

  1. Producer получает 10k изображений

  2. Гонит их в worker-ов на обработку

  3. Важно не копировать, не сериализовать

Пример:

# Producer
data = np.random.randint(0, 255, (1080, 1920, 3), dtype=np.uint8)
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
buf = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
buf[:] = data[:]

meta = {
    "name": shm.name,
    "shape": data.shape,
    "dtype": str(data.dtype)
}
queue.put(meta)
# Worker
meta = queue.get()
shm = shared_memory.SharedMemory(name=meta["name"])
data = np.ndarray(meta["shape"], dtype=meta["dtype"], buffer=shm.buf)

# process image...

shm.close()
shm.unlink()

Нюансы (куда без них)

Нужно вручную освобождать память

Когда вы создаёте SharedMemory(create=True), Python под делает системный вызов shm_open() или CreateFileMapping(). Появляется файл в /dev/shm. Закрытие .close() — это просто отклеиться от памяти. Но сегмент физически останется жить, пока вы не вызовете .unlink(). Если не сделать — память утечёт. Да, даже после завершения процесса.

Поэтому всегда: .close().unlink(). Лучше — с try/finally, или через atexit.

Нет защиты от гонок

shared_memory — это просто сырые байты. Никаких блокировок, никаких гарантий. Если один процесс пишет, а второй в этот момент читает — всё, вы в гонке. И никто вас не спасёт: не будет ни исключения, ни ворнинга.

Решается просто: multiprocessing.Lock(), Value(). Можно еще накинуть CAS, атомики, но дефолтно никакой защиты нет.

Передавать можно только байты

Забудьте про dict, list, кастомные классы и прочие удобства. SharedMemory ничего не знает о Python-объектах. Всё, что вы передаёте — это блок памяти. Вы обязаны сами решать, как в нём интерпретировать данные.

То есть: хотите строку, кодируйте вручную. Хотите сложный объект, заводите отдельный сегмент под JSON. Хотите батч картинок, передавайте shape, dtype, name сегмента.

Сегменты не исчезают при крэше

Если процесс падает (ошибка, SIGKILL, падение ядра — неважно), и вы не вызвали unlink(), сегмент остаётся висеть. Вы не видите это из Python, но система видит. И когда таких сегментов накапливается — у вас начинаются странные проблемы.

Решение одно: чистить за собой. Либо вручную, либо делать watchdog, который глянет на /dev/shm и сносит старое.

Если у вас есть опыт работы с multiprocessing.shared_memory — делитесь своим опытом комментариях.


А если вам приходилось масштабировать систему под реальные нагрузки, вы наверняка сталкивались с тем, что проблемы начинаются не в коде, а в архитектуре: очередь упирается в bottleneck, один из дата-центров проседает, а «умный» балансировщик внезапно становится SPOF.

Про пайпы и сериализацию мы сегодня поговорили, но это — только один слой. А что, если нужно, чтобы сервис стабильно работал в любой точке мира, быстро, отказоустойчиво и предсказуемо?

17 июня в 20:00 будет открытый урок про балансировку и геораспределение в Highload-системах. Обсудим, как крупные сервисы распределяют трафик между регионами, какие ошибки чаще всего ломают прод, и почему latency в 30 мс иногда важнее, чем SLA в 99.99%.

Регистрация открыта на странице курса «Highload Architect».

Комментарии (0)