Привет. Меня зовут Денис Лисовик, я Backend-инженер в команде Data Science SWAT Авито. Наша команда занимается разработкой вокруг задач машинного обучения.
Для максимальной утилизации GPU-ресурсов мы используем свою опенсорс-библиотеку Aqueduct. Aqueduct позволяет выполнять тяжелые операции в асинхронном контексте без блокирования event loop и унифицирует для нас подход к разработке стандартных Data Science/ML сценариев. Немного про Aqueduct мы уже поговорили вот тут.
В этой статье рассказываем, как используем разделяемую память в Aqueduct и как пришли к итоговому решению. Вместе с вами мы шаг за шагом пройдем от сервиса, который едва держит один RPS, до сервиса, который может держать сотни запросов в секунду. В процессе вы узнаете, как использовать разделяемую память и как сделать так, чтобы она не утекала, а приложение не падало с Segmentation fault.
Что внутри статьи:
Сервис разпознавания изображений
«Владение» разделяемой памятью
Сервис распознавания изображений
Предположим, что у нас есть сервис для распознавания изображений животных. На вход сервис получает байты изображения, а на выходе возвращает текст с названием животного.
Коллеги – Data Science-инженеры – сделали модель и передали нам для интеграции. Перед выкаткой в production мы хотим проверить, какую нагрузку будет держать сервис. Проводим нагрузочное и понимаем, что сервис едва держит 1 RPS.
Если мы посмотрим на загрузку CPU во время теста, то увидим, что одно ядро используется на 100%, а остальные простаивают. Очевидно, что нужно загрузить все ядра процессора, чтобы утилизировать все ресурсы и увеличить пропускную способность сервиса.
Опишем примерную структуру сервиса. Он будет состоять из:
главного процесса – асинхронный web-сервер, который принимает на вход изображения;
набора дочерних процессов, которые будут заниматься обработкой и распознаванием.
Можно сразу задать вопрос: почему бы не поднять несколько инстансов нашего сервиса и не поставить какой-то балансировщик перед ними?
В общем случае это сработает, но когда появляются видеокарты – все становится немного сложнее.
Первый пример:
У нас может быть один инстанс сервиса, который потребляет 10 Гб видео-памяти. Если собирать картинки в батч, например, по три штуки, то сервис будет отвечать быстрее и эффективнее, чем если бы они обрабатывались последовательно.
Если же мы поднимем три инстанса, которые к тому же могут быть распределены kubernetes на ноду с одной и той же видеокартой, то мы суммарно потратим 30 Гб видеопамяти. Батч не будет успевать набираться, хотя производительность сервиса в общем будет такой же, как и в первом случае, но в итоге мы потратим больше ресурсов видеопамяти.
Второй пример:
Работа с изображениями часто подразумевает несколько CPU-bound этапов обработки: ресайз, кроп, перевод в grayscale, и так далее.
Эти этапы могут быть долгими по сравнению с шагом обработки на видеокарте, поэтому мы можем улучшить производительность, если будем выполнять их в параллельном режиме и, например, передавать их одним батчом.
Эти этапы не хотелось бы разносить по разным сервисам, потому что это увеличит общее время ответа, усложнит систему и даст дополнительную нагрузку на сеть.
Multiprocessing
Вернемся к нашей изначальной идее распараллелить обработку изображений.
Воспользуемся ProcessPoolExecutor
из стандартной библиотеки python.
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=4)
def detect_image(image: bytes) -> str:
# cpu-bound task
return 'cat'
async def http_handler(request: Request) -> Response:
image = await request.read()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, detect_image, image)
return Response(result)
Проверяем, и все отлично работает. Задачи теперь распределяются по всем ядрам. Проводим нагрузочное и получаем хорошие результаты.
Теперь попробуем обрабатывать большие изображения по мегабайту и больше. Мы заметим серьезную просадку в нашей пропускной способности сервиса.
Почему так происходит?
Все дело в том, что ProcessPoolExecutor
запускает несколько отдельных процессов, и этим процессам нужно как-то общаться друг с другом или с главным процессом. Для этого multiprocessing
использует очередь Queue, под капотом у которой используется Pipe. Через Pipe можно передавать только байты, поэтому все данные предварительно нужно сериализовать. Для всех данных сначала вызывается pickle
, потом данные передаются в другой процесс и вызывается unpickle
.
В итоге мы тратим много времени на сериализацию, десериализацию и копирование данных между процессами.
Разделяемая память
Разделяемая память (shared memory) – это общий блок памяти, к нему может получить доступ любой процесс, у которого есть идентификатор этого блока. При этом несколько процессов или потоков могут одновременно обращаться к этой памяти. Это позволяет эффективно обмениваться данными без необходимости использовать сложные механизмы коммуникации.
Хорошая новость в том, что python поддерживает разделяемую память из коробки в стандартной библиотеке.
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import shared_memory
executor = ProcessPoolExecutor(max_workers=4)
def detect_image(shm: shared_memory.SharedMemory) -> str:
# read image from shared memory
image = shm.buf
# cpu-bound task ...
return 'cat'
async def http_handler(request: Request) -> Response:
loop = asyncio.get_running_loop()
image = await request.read()
# store image to shared memory
shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(image))
shm.buf[:len(image)] = image
result = await loop.run_in_executor(executor, detect_image, shm)
return Response(result)
На первый взгляд работа с разделяемой памятью достаточно проста. Импортируем и создаем вручную объект SharedMemory
, указав флаг create=True
и размер необходимой памяти. Далее вручную заполняем буфер SharedMemory
байтами изображения.
После инстанс SharedMemory
можно передать в подпроцесс и передаваться будет не само изображение, а только ссылка на него, причем происходить это будет мгновенно.
Если мы попробуем запустить этот сервис, то довольно быстро увидим, что у нас утекает память. Проблема в том, что Garbage Collector не управляет жизненным циклом разделяемой памяти, и мы должны сами следить за выделением и удалением.
Смотрим документацию python и исправляем ошибку.
...
def detect_image(shm: shared_memory.SharedMemory) -> str:
image = shm.buf
# cpu-bound task ...
result = 'cat'
shm.close()
return result
async def http_handler(request: Request) -> Response:
loop = asyncio.get_running_loop()
image = await request.read()
# store image to shared memory
shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(image))
shm.buf[:len(image)] = image
result = await loop.run_in_executor(executor, detect_image, shm)
shm.close()
shm.unlink()
return Response(result)
Чтобы очистить разделяемую память необходимо вызвать методы close()
и unlink()
.
метод
close()
вызывается в каждом процессе, когда процесс заканчивает работу с разделяемой памятью;метод
unlink()
вызывается один раз в последнем процессе.
Если теперь запустить сервис, мы увидим, что утечек памяти нет и сервис работает хорошо.
Такой подход можно использовать для простых сценариев, но проблема в том, что много всего нужно писать руками. При большой кодовой базе можно легко запутаться, что-то где-то не удалить или, наоборот, удалить лишний раз.
Если пойти искать решение данной проблемы, то в документации python можно найти класс SharedMemoryManager
.
Этот класс будет следить за жизненным циклом разделяемой памяти и очищать ее в нужный момент. Сам класс можно использовать в виде контекстного менеджера, чтобы не нужно было вызывать методы start()
и shutdown()
вручную. Под капотом этот класс запускает отдельный процесс, который будет единственным источником правды для управления блоками разделяемой памяти и сам будет вызывать в нужный момент метод unlink()
.
from concurrent.futures import ProcessPoolExecutor
from multiprocessing.managers import SharedMemoryManager
def detect_image(shm) -> str:
image = shm.buf
# cpu-bound task ...
return 'cat'
async def http_handler(request: Request) -> Response:
loop = asyncio.get_running_loop()
image = await request.read()
# store image to shared memory
with SharedMemoryManager() as manager:
shm = manager.SharedMemory(size=len(image))
shm.buf[:len(image)] = image
result = await loop.run_in_executor(executor, detect_image, shm)
return Response(result)
У такого подхода есть несколько минусов:
SharedMemoryManager
создает отдельный процесс, с которым все также приходится общаться по средствамPipe
или сетевого сокета, что ухудшает производительность;память почистить можно только в том процессе, который создал
SharedMemoryManager
. То есть там где мы выполнили методstart()
или создали контекстный менеджер.
В итоге для простых сценариев этот метод подходит и позволяет самому не управлять памятью, но для более сложных сценариев этот способ не подойдет.
«Владение» разделяемой памятью
Чтобы написать свой менеджер для управления разделяемой память, воспользуемся идеей владения (ownership) ресурсом.
Начнем с примера.
У нас есть 2 процесса – основной и подпроцесс обработчик. Также есть очередь, через которую они общаются. Процесс-обработчик не возвращает картинку, а отдает строку с ответом.
Для этого будем считать, что если мы передали блок разделяемой памяти в очередь (то есть передали в другой процесс), то мы теряем владение над этой памятью. Если мы не владеем этой памятью, то когда вызовется деструктор __del__
, мы просто ничего не делаем.
Если же мы владеем этой памятью, то мы можем вручную вызвать метод unlink()
, так как владение гарантирует нам, что память не будет использоваться в другом процессе.
Напишем класс SharedMemoryWrapper
from multiprocessing import shared_memory
class SharedMemoryWrapper:
def __init__(self, data: bytes) -> None:
self._shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(data))
self._owner = True
self._shm.buf[:len(data)] = data
@property
def buf(self) -> memoryview:
return self._shm.buf
def __getstate__(self) -> dict[str, Any]:
self._owner = False
return {'name': self._shm.name}
def __setstate__(self, state: dict[str, Any]) -> None:
self._shm = shared_memory.SharedMemory(name=state['name'], create=False)
self._owner = True
def __del__(self) -> None:
self._shm.close()
if self._owner:
self._shm.unlink()
Рассмотрим по шагам:
добавляем флаг
_owner=True
в наш wrapper;изначально выставляем его в
True
;когда мы передаем его в очередь, сбрасываем
_owner=False
;когда принимаем его из очереди, выставляем
_owner=True
;если GC сработал в подпроцессе обработчике, то вызывается
__del__
. Флаг_owner == True
- мы владеем объектом и поэтому удаляем его, вызвавunlink()
;главный процесс отдал объект и потерял над ним владение. Он проверяет этот же флаг
_owner == False
, и поэтому вызывает только методclose()
.
Здесь мы использовали методы __getstate__
и __setstate__
, чтобы завязаться на события отправки и получения из очереди. По факту эти методы вызываются при pickle
и unpickle
.
В итоге наш сервис будет выглядеть примерно так:
from concurrent.futures import ProcessPoolExecutor
def detect_image(shm: SharedMemoryWrapper) -> str:
image = shm.buf
# cpu-bound task ...
return 'cat'
async def http_handler(request: Request) -> Response:
loop = asyncio.get_running_loop()
image = await request.read()
shm = SharedMemoryWrapper(image)
result = await loop.run_in_executor(executor, detect_image, shm)
return Response(result)
Нам не нужно вручную управлять памятью или завязываться на SharedMemoryManager
. Кода нужно писать меньше и он понятнее.
И все было бы хорошо, но такой код работает не во всех случаях. Этот вариант работает только тогда, когда мы гарантированно знаем, что объект с разделяемой памятью пересылается только в один процесс одновременно.
Если мы хотим отправить одну картинку одновременно в несколько подпроцессов, каждый из которых делает какую-то свою задачу, например, определяет животное, породу и цвет, то в этом случае мы не сможем использовать этот подход, потому что не понятно, кто владеет разделяемой памятью и кому нужно очищать ее.
Для такого более сложного случая нужно реализовать счетчик ссылок RC.
Счетчик ссылок
В разделяемой памяти можно хранить не только наши данные, но и метаданные. В нашем случае мы можем хранить счетчик ссылок. Для примера будем хранить RC в последнем дополнительном байте нашей разделяемой памяти. Наш флаг _owner
мы заменим на счётчик и будем его инкрементировать и декрементировать. Когда он будет равен нулю, мы точно будем знать, что можем удалить этот объект.
from multiprocessing import shared_memory
class SharedMemoryWrapper:
def __init__(self, data: bytes) -> None:
self._shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(data) + 1)
self._shm.buf[:len(data)] = data
self._shm.buf[-1] = 1 # reference counter
@property
def buf(self) -> memoryview:
return self._shm.buf
def __getstate__(self) -> dict[str, Any]:
self._shm.buf[-1] += 1
return {'name': self._shm.name}
def __setstate__(self, state: dict[str, Any]) -> None:
self._shm = shared_memory.SharedMemory(name=state['name'], create=False)
def __del__(self) -> None:
self._shm.buf[-1] -= 1
rc = self._shm.buf[-1]
self._shm.close()
if rc == 0:
self._shm.unlink()
Если мы запустим эту версию, то получим Segmentation Fault. Наше приложение либо будет падать, либо мы будем иногда получать утечки памяти.
Здесь происходит стандартная гонка (race condition). Каждый из процессов сначала получает текущее значение счетчика ссылок, а потом его увеличивает или уменьшает. При этом другие процессы параллельно делают тоже самое. В итоге мы либо освободим память несколько раз и приложение упадёт, либо не освободим и у нас будет утечка.
Логичное решение этой проблемы использовать блокировку на изменение счетчика ссылок. Реализовать это можно в виде глобального общего словаря, который будет хранить счетчик ссылок для каждого блока разделяемой памяти, а изменение счетчика будет происходить через блокировку.
from multiprocessing import Manager, Lock
COUNTERS = Manager().dict()
class GlobalRC:
def __init__(self) -> None:
self._counters = COUNTERS
self._lock = Lock()
def incr(self, shm_name: str) -> None:
with self._lock:
if shm_name not in self._counters:
self._counters[shm_name] = 0
self._counters[shm_name] += 1
def decr(self, shm_name: str) -> None:
with self._lock:
self._counters[shm_name] -= 1
curr = self._counters[shm_name]
if curr == 0:
self._counters.pop(shm_name)
return curr
Тут мы используем Lock
из модуля multiprocessing
, который предоставляет блокировку, чтобы гарантировать, что в любой момент времени только один процесс будет изменять счетчик ссылок.
Для того, чтобы словарь был глобальным для всех процессов, мы используем Manager().dict()
. Manager
создает отдельный процесс для работы с глобальным словарем, который будет доступен всем процессам.
Все это позволит нам добиться стабильного кода и ошибок памяти больше не будет.
Попробуем провести нагрузочное еще раз.
Зеленый график – это baseline, когда мы не используем общий словарь и блокировки, но работа с разделяемой памятью не безопасна.
Красный график – это версия с использованием глобального общего словаря и блокировок.
По графику видно, что на относительно небольшой нагрузке мы работаем почти как baseline. При росте нагрузки версия со словарем начинает страдать, появляются всплески времени ответа, и сервис ведет себя непредсказуемо.
В версии с общим словарем у нас получилось что-то похожее на свой локальный GIL. Также на скорость влияет, что словарь созданный через Manager
и поэтому он живет в отдельном процессе и с ним нужно общаться через socket
или pipe
с сериализацией и десериализацией.
Атомарный счетчик ссылок
Альтернативный способ свести время блокировок к минимуму — это использовать Атомики. Это набор атомарных CPU-инструкций, которые позволяют изменять значение области памяти в конкурентной среде, где за эту память могут бороться несколько ядер CPU, эксклюзивно и без гонок.
Тут мы встречаем первое место, где в стандартной библиотеке python нет решения нашей проблемы. Чтобы воспользоваться атомиками, нам придется написать немного C-кода. Для этого мы будем использовать библиотеку
cffi
. Эта библиотека позволяет налету прямо в рантайме скомпилировать C код и подключить его в python-приложение.
Всего нам понадобится четыре операции: установить и загрузить значение, увеличить значение и получить результат, уменьшить значение и получить результат.
atomic = ffi.verify("""
uint32_t load_uint32(uint32_t *v) {
return __atomic_load_n(v, __ATOMIC_SEQ_CST);
};
void store_uint32(uint32_t *v, uint32_t n) {
uint32_t i = n;
__atomic_store(v, &i, __ATOMIC_SEQ_CST);
};
uint32_t add_and_fetch_uint32(uint32_t *v, uint32_t i) {
return __atomic_add_fetch(v, i, __ATOMIC_SEQ_CST);
};
uint32_t sub_and_fetch_uint32(uint32_t *v, uint32_t i) {
return __atomic_sub_fetch(v, i, __ATOMIC_SEQ_CST);
};
""")
Теперь эти низкоуровневые операции мы можем обернуть в удобный класс атомарный счетчик AtomicCounter
class AtomicCounter:
def __init__(self, view: memoryview):
self._ptr = ffi.cast('uint32_t*', ffi.from_buffer(view[:self.size()]))
def get(self):
return atomic.load_uint32(self._ptr)
def set(self, n):
return atomic.store_uint32(self._ptr, n)
def inc(self):
return atomic.add_and_fetch_uint32(self._ptr, 1)
def dec(self):
return atomic.sub_and_fetch_uint32(self._ptr, 1)
@staticmethod
def size():
return ffi.sizeof('uint32_t')
Далее мы встраиваем этот класс в наш SharedMemoryWraper
class SharedMemoryWrapper:
def __init__(self, size: int, shm: shared_memory.SharedMemory = None, shm_name: str = None):
self._shm: shared_memory.SharedMemory = None # noqa
if shm is None and shm_name is None:
rc_size = AtomicCounter.size()
shm = shared_memory.SharedMemory(create=True, size=size + rc_size)
self._rc = AtomicCounter(shm.buf)
self._rc.set(1)
self._attach_shm(shm, shm_name)
self._shm_name = self._shm.name
@property
def buf(self) -> memoryview:
return self._shm.buf[AtomicCounter.size():]
@property
def ref_count(self) -> int:
return self._rc.get()
def _attach_shm(self, shm: shared_memory.SharedMemory = None, shm_name: str = None):
if shm is None:
shm = shared_memory.SharedMemory(shm_name)
self._shm = shm
self._shm_name = shm.name
def __getstate__(self) -> dict:
self._rc.inc()
return state
def __setstate__(self, state: dict):
self._attach_shm(shm_name=self._shm_name)
self._rc = AtomicCounter(self._shm.buf)
def __del__(self):
curr_rc = self._rc.dec()
if curr_rc == 0:
self._shm.unlink()
Проводим нагрузочное
Теперь по графику видно, что график сервиса с использованием атомарного счетчика сравним с baseline графиком и больше нет непредсказуемых скачков времени ответа.
В итоге у себя в Aqueduct мы используем именно этот подход для работы с разделяемой памятью.
У этого подхода есть и минус. Во время работы сервиса процессы могут упасть из-за какой-то непредвиденной ошибки или операционная система может их убить, например, если они использовали слишком много памяти. При этом они не смогут декриментить счетчик ссылок.
Если продолжить дальше работать в таком состоянии, например попробовать перезапустить упавший процесс, то у нас могут возникнуть утечки памяти или segmentation fault при попытке лишний раз очистить память.
У себя в Aqueduct мы решили, что при падении любого подпроцесса будем завершать все подпроцессы и работу всего пайплайна. После этого наш метод is_running
возвращает False
и при попытке отправить в обработку новые задания мы бросаем исключение NotRunningError
.
Далее приложение определяет, что Aqueduct больше не работает и успешно завершается, а инфраструктура (kubernetes, systemd, supervisord) перезапустит приложение.
Заключение
В статье мы рассмотрели несколько способов как можно работать с разделяемой памятью в python. Каждый способ может быть полезным в той или иной задаче.
Мы начали с простого сервиса, который в синхронном режиме в один поток прогонял картинку и едва держал 1 RPS. Добавив три дочерних процесса, разделяемую память и атомарные счетчики, у нас получилось разогнать наш сервис до 900 RPS.
Все это мы делали вручную, используя стандартную библиотеку python. Каждый раз такие пайплайны писать долго и их сложно тестировать. Еще и все усложняется, если шагов в пайплайне больше одного.
Для решения подобных задач можно использовать Aqueduct, потому что он предоставляет все выше описанные возможности из коробки.
Есть оптимизации, которые непосредственно влияют на производительность сервиса:
Батчирование данных - ускоряет обработку и максимально утилизирует GPU, потому что данные отправляются на видео карту одновременно одной пачкой, а не последовательно одно за другим
Разделяемая память - меньше копирований данных, сериализаций и десериализаций между процессами.
Aqueduct – это готовый фреймворк для создания мультипроцесcинговых пайплайнов обработки данных, который обеспечивает:
Прозрачность организации кода в небольших подпроцесах - шагах (
FlowStep
)Описание практически в декларативном виде пайплайна обработки данных (
FLow
)Управление мультипроцесcингом - создание подпроцессов на каждый шаг пайплайна и организация их жизненного цикла и взаимодействия
Сбор метрик работы пайплайна.
Как использовать атомарный счетчик ссылок можно посмотреть у нас в Aqueduct тут.
Вопросы и отзывы о Aqueduct очень ждем в комментариях к статье! Спасибо вам за уделенное время, надеюсь, что наш опыт и наработки будут вам полезны.
parus-lead
Очень интересная статья! Спасибо за подробности. Очень понравилось выделение в отдельный класс AtomicCounter.