Привет. Меня зовут Денис Лисовик, я Backend-инженер в команде Data Science SWAT Авито. Наша команда занимается разработкой вокруг задач машинного обучения.

Для максимальной утилизации GPU-ресурсов мы используем свою опенсорс-библиотеку Aqueduct. Aqueduct позволяет выполнять тяжелые операции в асинхронном контексте без блокирования event loop и унифицирует для нас подход к разработке стандартных Data Science/ML сценариев. Немного про Aqueduct мы уже поговорили вот тут.

В этой статье рассказываем, как используем разделяемую память в Aqueduct и как пришли к итоговому решению. Вместе с вами мы шаг за шагом пройдем от сервиса, который едва держит один RPS, до сервиса, который может держать сотни запросов в секунду. В процессе вы узнаете, как использовать разделяемую память и как сделать так, чтобы она не утекала, а приложение не падало с Segmentation fault.

Что внутри статьи:

Сервис разпознавания изображений

Multiprocessing

Разделяемая память

«Владение» разделяемой памятью

Счетчик ссылок

Атомарный счетчик ссылок

Заключение

Сервис распознавания изображений

Предположим, что у нас есть сервис для распознавания изображений животных. На вход сервис получает байты изображения, а на выходе возвращает текст с названием животного.

Общее представление сервиса
Общее представление сервиса

Коллеги – Data Science-инженеры – сделали модель и передали нам для интеграции. Перед выкаткой в production мы хотим проверить, какую нагрузку будет держать сервис. Проводим нагрузочное и понимаем, что сервис едва держит 1 RPS.

Если мы посмотрим на загрузку CPU во время теста, то увидим, что одно ядро используется на 100%, а остальные простаивают. Очевидно, что нужно загрузить все ядра процессора, чтобы утилизировать все ресурсы и увеличить пропускную способность сервиса.

Опишем примерную структуру сервиса. Он будет состоять из:

  • главного процесса – асинхронный web-сервер, который принимает на вход изображения;

  • набора дочерних процессов, которые будут заниматься обработкой и распознаванием.

Структура сервиса
Структура сервиса

Можно сразу задать вопрос: почему бы не поднять несколько инстансов нашего сервиса и не поставить какой-то балансировщик перед ними?

Несколько контейнеров с балансировщиком нагрузки
Несколько контейнеров с балансировщиком нагрузки

В общем случае это сработает, но когда появляются видеокарты – все становится немного сложнее.

Первый пример:

У нас может быть один инстанс сервиса, который потребляет 10 Гб видео-памяти. Если собирать картинки в батч, например, по три штуки, то сервис будет отвечать быстрее и эффективнее, чем если бы они обрабатывались последовательно.

Если же мы поднимем три инстанса, которые к тому же могут быть распределены kubernetes на ноду с одной и той же видеокартой, то мы суммарно потратим 30 Гб видеопамяти. Батч не будет успевать набираться, хотя производительность сервиса в общем будет такой же, как и в первом случае, но в итоге мы потратим больше ресурсов видеопамяти.

Второй пример:

Работа с изображениями часто подразумевает несколько CPU-bound этапов обработки: ресайз, кроп, перевод в grayscale, и так далее.

Эти этапы могут быть долгими по сравнению с шагом обработки на видеокарте, поэтому мы можем улучшить производительность, если будем выполнять их в параллельном режиме и, например, передавать их одним батчом.

Эти этапы не хотелось бы разносить по разным сервисам, потому что это увеличит общее время ответа, усложнит систему и даст дополнительную нагрузку на сеть.

Multiprocessing

Вернемся к нашей изначальной идее распараллелить обработку изображений.

Воспользуемся ProcessPoolExecutor из стандартной библиотеки python.

from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor(max_workers=4)

def detect_image(image: bytes) -> str:
    # cpu-bound task
    return 'cat'

async def http_handler(request: Request) -> Response:
    image = await request.read()
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(executor, detect_image, image)
    return Response(result)
Сервис с multiprocessing
Сервис с multiprocessing

Проверяем, и все отлично работает. Задачи теперь распределяются по всем ядрам. Проводим нагрузочное и получаем хорошие результаты.

Теперь попробуем обрабатывать большие изображения по мегабайту и больше. Мы заметим серьезную просадку в нашей пропускной способности сервиса.

Почему так происходит?

Все дело в том, что ProcessPoolExecutor запускает несколько отдельных процессов, и этим процессам нужно как-то общаться друг с другом или с главным процессом. Для этого multiprocessing использует очередь Queue, под капотом у которой используется Pipe. Через Pipe можно передавать только байты, поэтому все данные предварительно нужно сериализовать. Для всех данных сначала вызывается pickle, потом данные передаются в другой процесс и вызывается unpickle.

pickle/unpickle между процессами
pickle/unpickle между процессами

В итоге мы тратим много времени на сериализацию, десериализацию и копирование данных между процессами.

Разделяемая память

Разделяемая память (shared memory) – это общий блок памяти, к нему может получить доступ любой процесс, у которого есть идентификатор этого блока. При этом несколько процессов или потоков могут одновременно обращаться к этой памяти. Это позволяет эффективно обмениваться данными без необходимости использовать сложные механизмы коммуникации.

Разделяемая память
Разделяемая память

Хорошая новость в том, что python поддерживает разделяемую память из коробки в стандартной библиотеке.

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import shared_memory

executor = ProcessPoolExecutor(max_workers=4)

def detect_image(shm: shared_memory.SharedMemory) -> str:
    # read image from shared memory
    image = shm.buf
    # cpu-bound task ...
    return 'cat'

async def http_handler(request: Request) -> Response:
    loop = asyncio.get_running_loop()
    image = await request.read()

    # store image to shared memory
    shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(image))
    shm.buf[:len(image)] = image
    result = await loop.run_in_executor(executor, detect_image, shm)
    return Response(result)

На первый взгляд работа с разделяемой памятью достаточно проста. Импортируем и  создаем вручную объект SharedMemory, указав флаг create=True и размер необходимой памяти. Далее вручную заполняем буфер SharedMemory байтами изображения.

После инстанс SharedMemory можно передать в подпроцесс и передаваться будет не само изображение, а только ссылка на него, причем происходить это будет мгновенно.

Если мы попробуем запустить этот сервис, то довольно быстро увидим, что у нас утекает память. Проблема в том, что Garbage Collector не управляет жизненным циклом разделяемой памяти, и мы должны сами следить за выделением и удалением.

Смотрим документацию python и исправляем ошибку.

...

def detect_image(shm: shared_memory.SharedMemory) -> str:
    image = shm.buf
    # cpu-bound task ...
    result = 'cat'
    shm.close()
    return result

async def http_handler(request: Request) -> Response:
    loop = asyncio.get_running_loop()
    image = await request.read()

    # store image to shared memory
    shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(image))
    shm.buf[:len(image)] = image
    result = await loop.run_in_executor(executor, detect_image, shm)
    shm.close()
    shm.unlink()
    return Response(result)

Чтобы очистить разделяемую память необходимо вызвать методы close() и unlink().

  • метод close() вызывается в каждом процессе, когда процесс заканчивает работу с разделяемой памятью;

  • метод unlink() вызывается один раз в последнем процессе.

Если теперь запустить сервис, мы увидим, что утечек памяти нет и сервис работает хорошо.

Такой подход можно использовать для простых сценариев, но проблема в том, что много всего нужно писать руками. При большой кодовой базе можно легко запутаться, что-то где-то не удалить или, наоборот, удалить лишний раз.

Если пойти искать решение данной проблемы, то в документации python можно найти класс SharedMemoryManager.

Этот класс будет следить за жизненным циклом разделяемой памяти и очищать ее в нужный момент. Сам класс можно использовать в виде контекстного менеджера, чтобы не нужно было вызывать методы start() и shutdown() вручную. Под капотом этот класс запускает отдельный процесс, который будет единственным источником правды для управления блоками разделяемой памяти и сам будет вызывать в нужный момент метод unlink().

from concurrent.futures import ProcessPoolExecutor
from multiprocessing.managers import SharedMemoryManager

def detect_image(shm) -> str:
    image = shm.buf
    # cpu-bound task ...
    return 'cat'

async def http_handler(request: Request) -> Response:
    loop = asyncio.get_running_loop()
    image = await request.read()

    # store image to shared memory
    with SharedMemoryManager() as manager:
        shm = manager.SharedMemory(size=len(image))
        shm.buf[:len(image)] = image
        result = await loop.run_in_executor(executor, detect_image, shm)

    return Response(result)

У такого подхода есть несколько минусов:

  • SharedMemoryManager создает отдельный процесс, с которым все также приходится общаться по средствам Pipe или сетевого сокета, что ухудшает производительность;

  • память почистить можно только в том процессе, который создал SharedMemoryManager. То есть там где мы выполнили метод start() или создали контекстный менеджер.

В итоге для простых сценариев этот метод подходит и позволяет самому не управлять памятью, но для более сложных сценариев этот способ не подойдет.

«Владение» разделяемой памятью

Чтобы написать свой менеджер для управления разделяемой память, воспользуемся идеей владения (ownership) ресурсом.

Начнем с примера.

У нас есть 2 процесса – основной и подпроцесс обработчик. Также есть очередь, через которую они общаются. Процесс-обработчик не возвращает картинку, а отдает строку с ответом.

Для этого будем считать, что если мы передали блок разделяемой памяти в очередь (то есть передали в другой процесс), то мы теряем владение над этой памятью. Если мы не владеем этой памятью, то когда вызовется деструктор __del__, мы просто ничего не делаем.

Если же мы владеем этой памятью, то мы можем вручную вызвать метод unlink(), так как владение гарантирует нам, что память не будет использоваться в другом процессе.

Обработка задачи с «владением» данными
Обработка задачи с «владением» данными

Напишем класс SharedMemoryWrapper

from multiprocessing import shared_memory

class SharedMemoryWrapper:
    def __init__(self, data: bytes) -> None:
        self._shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(data))
        self._owner = True
        self._shm.buf[:len(data)] = data

    @property
    def buf(self) -> memoryview:
        return self._shm.buf

    def __getstate__(self) -> dict[str, Any]:
        self._owner = False
        return {'name': self._shm.name}

    def __setstate__(self, state: dict[str, Any]) -> None:
        self._shm = shared_memory.SharedMemory(name=state['name'], create=False)
        self._owner = True

    def __del__(self) -> None:
        self._shm.close()
        if self._owner:
            self._shm.unlink()

Рассмотрим по шагам:

  1. добавляем флаг _owner=True в наш wrapper;

  2. изначально выставляем его в True;

  3. когда мы передаем его в очередь, сбрасываем _owner=False;

  4. когда принимаем его из очереди, выставляем _owner=True;

  5. если GC сработал в подпроцессе обработчике, то вызывается __del__. Флаг _owner == True - мы владеем объектом и поэтому удаляем его, вызвав unlink();

  6. главный процесс отдал объект и потерял над ним владение. Он проверяет этот же флаг _owner == False, и поэтому вызывает только метод close().

Здесь мы использовали методы __getstate__ и __setstate__, чтобы завязаться на события отправки и получения из очереди. По факту эти методы вызываются при pickle и unpickle.

В итоге наш сервис будет выглядеть примерно так:

from concurrent.futures import ProcessPoolExecutor

def detect_image(shm: SharedMemoryWrapper) -> str:
    image = shm.buf
    # cpu-bound task ...
    return 'cat'

async def http_handler(request: Request) -> Response:
    loop = asyncio.get_running_loop()
    image = await request.read()
    shm = SharedMemoryWrapper(image)
    result = await loop.run_in_executor(executor, detect_image, shm)
    return Response(result)

Нам не нужно вручную управлять памятью или завязываться на SharedMemoryManager. Кода нужно писать меньше и он понятнее.

И все было бы хорошо, но такой код работает не во всех случаях. Этот вариант работает только тогда, когда мы гарантированно знаем, что объект с разделяемой памятью пересылается только в один процесс одновременно.

Кому очищать разделяемую память — не понятно
Кому очищать разделяемую память — не понятно

Если мы хотим отправить одну картинку одновременно в несколько подпроцессов, каждый из которых делает какую-то свою задачу, например, определяет животное, породу и цвет, то в этом случае мы не сможем использовать этот подход, потому что не понятно, кто владеет разделяемой памятью и кому нужно очищать ее.

Для такого более сложного случая нужно реализовать счетчик ссылок RC.

Счетчик ссылок

В разделяемой памяти можно хранить не только наши данные, но и метаданные. В нашем случае мы можем хранить счетчик ссылок. Для примера будем хранить RC в последнем дополнительном байте нашей разделяемой памяти. Наш флаг _owner мы заменим на счётчик и будем его инкрементировать и декрементировать. Когда он будет равен нулю, мы точно будем знать, что можем удалить этот объект.

from multiprocessing import shared_memory

class SharedMemoryWrapper:
    def __init__(self, data: bytes) -> None:
        self._shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(data) + 1)
        self._shm.buf[:len(data)] = data
        self._shm.buf[-1] = 1  # reference counter

    @property
    def buf(self) -> memoryview:
        return self._shm.buf

    def __getstate__(self) -> dict[str, Any]:
        self._shm.buf[-1] += 1
        return {'name': self._shm.name}

    def __setstate__(self, state: dict[str, Any]) -> None:
        self._shm = shared_memory.SharedMemory(name=state['name'], create=False)

    def __del__(self) -> None:
        self._shm.buf[-1] -= 1
        rc = self._shm.buf[-1]
        self._shm.close()
        if rc == 0:
            self._shm.unlink()

Если мы запустим эту версию, то получим Segmentation Fault. Наше приложение либо будет падать, либо мы будем иногда получать утечки памяти.

Конкурентный доступ к счетчику. Утечка памяти
Конкурентный доступ к счетчику. Утечка памяти

Здесь происходит стандартная гонка (race condition). Каждый из процессов сначала получает текущее значение счетчика ссылок, а потом его увеличивает или уменьшает. При этом другие процессы параллельно делают тоже самое. В итоге мы либо освободим память несколько раз и приложение упадёт, либо не освободим и у нас будет утечка.

Логичное решение этой проблемы использовать блокировку на изменение счетчика ссылок. Реализовать это можно в виде глобального общего словаря, который будет хранить счетчик ссылок для каждого блока разделяемой памяти, а изменение счетчика будет происходить через блокировку.

from multiprocessing import Manager, Lock

COUNTERS = Manager().dict()

class GlobalRC:
    def __init__(self) -> None:
        self._counters = COUNTERS
        self._lock = Lock()

    def incr(self, shm_name: str) -> None:
        with self._lock:
            if shm_name not in self._counters:
                self._counters[shm_name] = 0
            self._counters[shm_name] += 1

    def decr(self, shm_name: str) -> None:
        with self._lock:
            self._counters[shm_name] -= 1
            curr = self._counters[shm_name]
            if curr == 0:
                self._counters.pop(shm_name)
        return curr

Тут мы используем Lock из модуля multiprocessing, который предоставляет блокировку, чтобы гарантировать, что в любой момент времени только один процесс будет изменять счетчик ссылок.

Для того, чтобы словарь был глобальным для всех процессов, мы используем Manager().dict(). Manager создает отдельный процесс для работы с глобальным словарем, который будет доступен всем процессам.

Все это позволит нам добиться стабильного кода и ошибок памяти больше не будет.

Попробуем провести нагрузочное еще раз.

Глобальный общий словарь и baseline без блокировок
Глобальный общий словарь и baseline без блокировок

Зеленый график – это baseline, когда мы не используем общий словарь и блокировки, но работа с разделяемой памятью не безопасна.

Красный график – это версия с использованием глобального общего словаря и блокировок.

По графику видно, что на относительно небольшой нагрузке мы работаем почти как baseline. При росте нагрузки версия со словарем начинает страдать, появляются всплески времени ответа, и сервис ведет себя непредсказуемо.

В версии с общим словарем у нас получилось что-то похожее на свой локальный GIL. Также на скорость влияет, что словарь созданный через Manager и поэтому он живет в отдельном процессе и с ним нужно общаться через socket или pipe с сериализацией и десериализацией.

Атомарный счетчик ссылок

Альтернативный способ свести время блокировок к минимуму — это использовать Атомики. Это набор атомарных CPU-инструкций, которые позволяют изменять значение области памяти в конкурентной среде, где за эту память могут бороться несколько ядер CPU, эксклюзивно и без гонок.

Тут мы встречаем первое место, где в стандартной библиотеке python нет решения нашей проблемы. Чтобы воспользоваться атомиками, нам придется написать немного C-кода. Для этого мы будем использовать библиотеку cffi. Эта библиотека позволяет налету прямо в рантайме скомпилировать C код и подключить его в python-приложение.

Всего нам понадобится четыре операции: установить и загрузить значение, увеличить значение и получить результат, уменьшить значение и получить результат.

atomic = ffi.verify("""
uint32_t load_uint32(uint32_t *v) {
    return __atomic_load_n(v, __ATOMIC_SEQ_CST);
};
void store_uint32(uint32_t *v, uint32_t n) {
    uint32_t i = n;
    __atomic_store(v, &i, __ATOMIC_SEQ_CST);
};
uint32_t add_and_fetch_uint32(uint32_t *v, uint32_t i) {
    return __atomic_add_fetch(v, i, __ATOMIC_SEQ_CST);
};
uint32_t sub_and_fetch_uint32(uint32_t *v, uint32_t i) {
    return __atomic_sub_fetch(v, i, __ATOMIC_SEQ_CST);
};
""")

Теперь эти низкоуровневые операции мы можем обернуть в удобный класс атомарный счетчик AtomicCounter

class AtomicCounter:
    def __init__(self, view: memoryview):
        self._ptr = ffi.cast('uint32_t*', ffi.from_buffer(view[:self.size()]))

    def get(self):
        return atomic.load_uint32(self._ptr)

    def set(self, n):
        return atomic.store_uint32(self._ptr, n)

    def inc(self):
        return atomic.add_and_fetch_uint32(self._ptr, 1)

    def dec(self):
        return atomic.sub_and_fetch_uint32(self._ptr, 1)

    @staticmethod
    def size():
        return ffi.sizeof('uint32_t')

Далее мы встраиваем этот класс в наш SharedMemoryWraper

class SharedMemoryWrapper:
    def __init__(self, size: int, shm: shared_memory.SharedMemory = None, shm_name: str = None):
        self._shm: shared_memory.SharedMemory = None  # noqa

        if shm is None and shm_name is None:
            rc_size = AtomicCounter.size()
            shm = shared_memory.SharedMemory(create=True, size=size + rc_size)
            self._rc = AtomicCounter(shm.buf)
            self._rc.set(1)

        self._attach_shm(shm, shm_name)
        self._shm_name = self._shm.name

    @property
    def buf(self) -> memoryview:
        return self._shm.buf[AtomicCounter.size():]

    @property
    def ref_count(self) -> int:
        return self._rc.get()

    def _attach_shm(self, shm: shared_memory.SharedMemory = None, shm_name: str = None):
        if shm is None:
            shm = shared_memory.SharedMemory(shm_name)
        self._shm = shm
        self._shm_name = shm.name

    def __getstate__(self) -> dict:
        self._rc.inc()
        return state

    def __setstate__(self, state: dict):
        self._attach_shm(shm_name=self._shm_name)
        self._rc = AtomicCounter(self._shm.buf)

    def __del__(self):
        curr_rc = self._rc.dec()
        if curr_rc == 0:
            self._shm.unlink()

Проводим нагрузочное

Теперь по графику видно, что график сервиса с использованием атомарного счетчика сравним с baseline графиком и больше нет непредсказуемых скачков времени ответа.

В итоге у себя в Aqueduct мы используем именно этот подход для работы с разделяемой памятью.

У этого подхода есть и минус. Во время работы сервиса процессы могут упасть из-за какой-то непредвиденной ошибки или операционная система может их убить, например, если они использовали слишком много памяти. При этом они не смогут декриментить счетчик ссылок.

Если продолжить дальше работать в таком состоянии, например попробовать перезапустить упавший процесс, то у нас могут возникнуть утечки памяти или segmentation fault при попытке лишний раз очистить память.

У себя в Aqueduct мы решили, что при падении любого подпроцесса будем завершать все подпроцессы и работу всего пайплайна. После этого наш метод is_running возвращает False и при попытке отправить в обработку новые задания мы бросаем исключение NotRunningError.

Далее приложение определяет, что Aqueduct больше не работает и успешно завершается, а инфраструктура (kubernetes, systemd, supervisord) перезапустит приложение.

Заключение

В статье мы рассмотрели несколько способов как можно работать с разделяемой памятью в python. Каждый способ может быть полезным в той или иной задаче.

Мы начали с простого сервиса, который в синхронном режиме в один поток прогонял картинку и едва держал 1 RPS. Добавив три дочерних процесса, разделяемую память и атомарные счетчики, у нас получилось разогнать наш сервис до 900 RPS.

Все это мы делали вручную, используя стандартную библиотеку python. Каждый раз такие пайплайны писать долго и их сложно тестировать. Еще и все усложняется, если шагов в пайплайне больше одного.

Для решения подобных задач можно использовать Aqueduct, потому что он предоставляет все выше описанные возможности из коробки.

Есть оптимизации, которые непосредственно влияют на производительность сервиса:

  • Батчирование данных - ускоряет обработку и максимально утилизирует GPU, потому что данные отправляются на видео карту одновременно одной пачкой, а не последовательно одно за другим

  • Разделяемая память - меньше копирований данных, сериализаций и десериализаций между процессами.

Aqueduct – это готовый фреймворк для создания мультипроцесcинговых пайплайнов обработки данных, который обеспечивает:

  • Прозрачность организации кода в небольших подпроцесах - шагах (FlowStep)

  • Описание практически в декларативном виде пайплайна обработки данных (FLow)

  • Управление мультипроцесcингом - создание подпроцессов на каждый шаг пайплайна и организация их жизненного цикла и взаимодействия

  • Сбор метрик работы пайплайна.

Как использовать атомарный счетчик ссылок можно посмотреть у нас в Aqueduct тут.

Вопросы и отзывы о Aqueduct очень ждем в комментариях к статье! Спасибо вам за уделенное время, надеюсь, что наш опыт и наработки будут вам полезны.

Комментарии (2)


  1. parus-lead
    18.12.2024 12:12

    Очень интересная статья! Спасибо за подробности. Очень понравилось выделение в отдельный класс AtomicCounter.


  1. Hazzari
    18.12.2024 12:12

    Отличная статья! спасибо!