Всем привет! Меня зовут Дмитрий, я руковожу командой государственных интеграций в Ozon Банке. Сегодня я расскажу о том, как мы столкнулись с проблемой гонок при батчевой обработке данных в распределённой системе — и какие решения мы рассматривали, чтобы эту проблему решить. Материал основан на реальном кейсе и будет интересен всем, кто работает с PostgreSQL, батчами, распределёнными системами и борьбой за консистентность в высоконагруженных системах.

Предыстория

Летом 2024 года к нашей команде перешёл только что запущенный сервис, отвечающий за обработку и обмен документами, требующими строгого соответствия регламентам государственных структур. Особенность сервиса — он работает на Windows ВМ. Причина в том, что для выполнения ключевых операций используется проприетарная утилита одного из госорганов, доступная только на Windows. У нас было 4 инстанса приложения.

Снимок экрана 2025-03-18 в 09.36.54.png
Схема приложения

Архитектурно система строится на PostgreSQL и наборе воркеров, которые в фоновом режиме выполняют батчевую обработку документов:

  1. Из базы выбираются записи, удовлетворяющие определённым условиям.

  2. Эти записи обрабатываются (стоит сказать, что обработка весьма затратная, и время её работы измеряется в минутах).

  3. Их статус обновляется.

Снимок экрана 2025-03-18 в 10.20.11.png

Очевидно, что мы хотим по максимуму распараллелить обработку документов и для этого после этапа (1) выборки из бд, мы блокируем документы, чтобы избежать их параллельной обработки сразу на нескольких инстансах приложения.

Блокировка была реализована с помощью специальной таблицы json_locks, в которую заносились идентификаторы документов, взятых в работу.

Структура таблицы json_locks:

И псевдо-SQL, который отражает логику работы этапа выборки и блокировки документов:

BEGIN;
SELECT * FROM documents WHERE status = 'new' LIMIT batch_size;
INSERT INTO json_locks(id, locked_keys, locked_untill)
VALUES (..., ARRAY[ids], ...);
COMMIT;
Снимок экрана 2025-03-14 в 19.55.40.png

С точки зрения прозрачности решение отличное — мы могли посмотреть, когда и какие документы были взяты в работу. Но с ростом нагрузки появилась проблема.

Аномалии в PostgreSQL

Вы наверняка знаете, что в PostgreSQL есть ряд аномалий, проявляющихся на разных уровнях изоляции: грязное чтение, неповторяющееся чтение, чтение фантомов и потерянное обновление. Но правда в том, что есть ещё набор аномалий, про которые говорят гораздо реже: сдвиг чтения, сдвиг записи, потерянная вставка и проблема сериализуемости. С одной из них мы и столкнулись.

Снимок экрана 2025-03-18 в 12.29.28.png

Допустим, у нас 2 инстанса приложения. На первом происходит выборка документов в момент t1. Через доли секунды второй инстанс делает то же самое. Поскольку в таблице json_locks на этот момент ещё нет записей о текущих документах, оба запроса могут вернуть пересекающиеся множества ID.

Снимок экрана 2025-03-18 в 12.31.23.png

Далее оба инстанса делают вставку в json_locks. PostgreSQL (в уровне изоляции READ COMMITTED) не видит конфликта — транзакции проходят успешно, и один и тот же документ может быть обработан одновременно двумя воркерами.

PostgreSQL не блокирует вставку, потому что операции касаются разных строк, и на уровне модели данных конфликта нет. Но логически — он есть.

Какие это можно починить?

Мы системно проанализировали все возможные подходы. Чтобы не упускать детали, ниже описаны 6 стратегий, которые мы рассмотрели или протестировали.

1. Повышение уровня изоляции транзакций

Самое очевидное: если сдвиг записи (write skew) возникает при READ COMMITTED, может, стоит просто использовать SERIALIZABLE, на котором он не воспроизводится?

Снимок экрана 2025-03-18 в 12.55.14.png

Здесь у нас есть 2 опции:

(а) повысить уровень изоляции во всей системе

(б) или повысить уровень изоляции только для тех транзакций, в которых эта проблема может появляться

(а) Увеличение уровня изоляции приводит к снижению производительности — насколько сильное, зависит от системы, в которой проводится эта операция. И чтобы ответить на вопрос, «норм ли нам будет с такой нагрузкой», придётся провести нагрузочное тестирование — а это операция не бесплатная.

(б) Опять же изменения простые. В BEGIN TRANSACTION достаточно добавить небольшую приписку с указанием уровня изоляции. Но и тут есть подводные камни: если одни части кода используют READ COMMITTED, а другие SERIALIZABLE, это может привести к неожиданным блокировкам или конфликтам, которые к тому же будет непросто отдебажить

Итого

Плюсы:

  • Простота реализации

  • PostgreSQL гарантирует отсутствие write skew

Минусы:

  • Потенциальное падение производительности при высоких нагрузках

  • Возможны serialization failures, которые нужно обрабатывать в коде

  • Разные уровни изоляции в разных частях кода могут привести к блокировкам

2. Advisory Lock

Advisory locks — это механизм, предоставляющий пользователям возможность реализовывать собственную систему блокировок, не связанную напрямую с блокировками на уровне таблиц или строк в БД. Это удобный способ синхронизации доступа к ресурсам как между транзакциями, так и между разными сессиями. Выглядит это примерно так:

SELECT pg_advisory_xact_lock(произвольный_числовой_ключ);

Advisory Lock устанавливается на произвольный числовой ключ, по которому и происходит блокировка. В PostgreSQL Advisory Locks делятся на 2 уровня:

  • Session-level — держатся до конца сессии, даже если транзакция завершилась

  • Transaction-level — автоматически освобождаются в конце транзакции

Также бывает 2 типа локов:

  • Exclusive — только один процесс; может держать лок по ключу

  • Shared — несколько процессов могут одновременно удерживать shared-лок по одному ключу, но если кто-то попытается взять exclusive — он будет ждать, пока все shared не отпустят.
    Механизм похож на RWMutex в Go: RLock — это shared лок, Lock — exclusive лок.

И, наконец, у каждого лока есть 3 способа взаимодействия:

  • Acquire — получить лок, подождать, если занят.

  • Try — попытаться получить лок без ожидания.

  • Release — явно освободить лок.

В итоге суть решения — добавить строку вида pg_advisory_lock(hash(worker_name)) на этапе выборки документов.

Но в работе с advisory локами стоит быть аккуратными: если вы всё же решаете использовать session-level advisory локи, то важно следить за тем, чтобы они не выедали пул коннектов. Это может произойти, если случайно где-то забыли сделать release на коннект.

А если некорректно настроить таймаут, система может где-то простаивать из-за действующей блокировки. Что тоже негативно скажется на пропускной способности.

Итого
Плюсы:

  • Простота реализации

Минусы:

  • Если использовать session-level lock, возможны утечки коннектов

  • Сложнее отлаживать проблемы

3. Rocket Lock

Внутри Ozon Банка существует часто используемый подход к реализации распределенных блокировок, с помощью отдельного механизма Rocket Lock. 

Решение основано на специальной таблице из 2 колонок: строковый ключ блокировки и поле locked_until. В последнем отражено время, до которого действует блокировка по данному ключу.

Идея сильно похожа на advisory локи, но информация о блокировках хранится не под капотом бд. А в обычной табличке, с которой всем удобно и привычно работать — и по необходимости кастомить.

Псевдокод метода работы с этим решением:

Снимок экрана 2025-03-29 в 13.22.57.png

При попытке захватить лок:

  • если запись есть и locked_until > now() — выкидываем зарезервированную ошибку, по которой мы понимаем, что лок уже взят другим процессом,

  • если locked_until истёк — обновляем; за счёт этого обеспечивается переиспользуемость ключей.

Изменения, которые нужно внести — заменить транзакцию из этапа выборки и блокировки данных на Rocket Lock. Ну и реализовать методы по работе с новой таблицей

Итого
Плюсы:

  • Проблемы легко диагностировать

Минусы:

  • Более высокая нагрузка на БД в зависимости от сценария использования

  • Нужна создать доп. таблицу и продумать логику работы с ней

4. Нормализация json_locks

Наверняка, глядя на структуру таблицы json_locks, вы подумали: «Вот бы можно было на колонку с айдишниками просто накинуть индекс уникальности значений». И данное решение основано как раз на этой мысли)

В таблице json_locks для блокировки можно использовать не 1 строку с массивом айдишников, а несколько строк с одним айдишником в каждой. А на столбец с айди блокируемого документа настроить индекс на уникальности. Так мы гарантируем, что ни один документ не попадёт в обработку сразу несколькими инстансами приложения — при вставке в json_locks выпадет ошибка.

Новая схема таблицы json_locks выглядит так: 

В части выборки данных для обработки ключевое изменение — это то, что теперь нам не нужна транзакция. Уникальность данных обеспечивается по constraint в бд. Изменения произойдут и в методе вставки в таблицу json_locks — теперь там будет вставка множества строк, а не одной, как раньше.

Таким образом, логика становится более прозрачной и тестируемой. Заодно исчезает необходимость в использовании транзакции: выборку документов и создание блокировки в json_locks можно выполнять без неё.

Но в то же время появляются проблемы с производительностью — в зависимости от размера выбираемых батчей и параллельно запущенных инстансов приложения. А также уменьшится пропускная способность системы, поскольку неизбежно батчи будут неполными — обработка будет происходить только по новым записям в json_locks. Кроме того, решение предполагает изменение структуры БД, что автоматически делает его недешёвым в реализации.

Итого
Плюсы:

  • Гарантия на корректность работы на уровне БД

  • Упрощение логики выборки — транзакция не нужна

Минусы:

  • Сложность миграции

  • Возможные проблемы с производительностью при больших батчах

5. Промежуточный статус

Это решение основано уже не на изменениях в схеме бд или подходах к работе с ней, а на изменениях в приложении. Если добавить промежуточный статус для документов, выбранных к обработке — in_progress_or_something_like_this, и использовать UPDATE ... SET status=’in_progress_or_something_like_this’ WHERE id IN (SELECT ... FOR UPDATE SKIP LOCKED) RETURNING ...;, то мы выберем и заблокируем все документы к обработке без всяких дополнительных транзакций и таблиц блокировки!

Но есть у такого решения и минусы. Что будет, если обработка завершится неуспешно после этапа выборки данных и их перевода в промежуточный статус? Потребуется дополнительная логика «добивки», которая будет переводить застрявшие в промежуточном статусе данные.

Второй минус — риск переусложнения статусной модели за счёт добавления избыточного количества статусов. Статус придётся добавлять на каждый этап обработки, а суммарно их будет столько же, сколько разных воркеров для обработки в пайплайне.

И третья из возможных проблем: за счёт skip locked возможны ситуации, когда фактически выбранные для обработки батчи меньше ожидаемых размеров, что уменьшает пропускную способность системы.

Итого
Плюсы:

  • Простота и прозрачность

  • Нет необходимости во вспомогательных таблицах

Минусы:

  • Нужна логика "добивки" застрявших документов

  • Статусная модель расширяется

  • Возможны неполные батчи

6. Kafka

Теперь попробуем выйти за рамки нашего приложения и подумать над решением, которое влияет на инфраструктуру проекта. Для того, чтобы охватить планируемые изменения, нам нужно будет выйти за пределы этого локального блока с выборкой данных. А также посмотреть на ту часть, где документы переводят в статус, требуемый в нашей выборке. Теперь здесь мы будем не только обновлять статус документов, но и отправлять сообщение в кафку. 

Снимок экрана 2025-03-29 в 16.01.26.png

И у нас появляется знакомая многим проблема. Что если наше приложение по какой-то причине завершит работу после того, как мы запродюсили сообщение в кафку, но до того, как статус в бд обновился?

Здесь нам на помощь приходит популярный паттерн transactional outbox, чтобы сообщения не терялись. Идея состоит в том, чтобы не сразу продюсить сообщение, а сначала сделать запись в специальной табличке outbox. И уже из неё специальный бэкграунд воркер вычитает и отправит сообщения.

Сам паттерн transactional outbox называется так, потому что запись в отдельную таблицу можно проводить в транзакции, как и другие SQL-запросы. Одновременно с обновлениями статусов или иными действиями, которые могут проходить вместе с отправкой нотификации в кафку.

Снимок экрана 2025-03-29 в 16.14.25.png

И сообщения, которые мы в кафку записали, где-то потребуется вычитывать — причём делать это батчами. А из нужной партиции кафки, их число необходимо сделать кратным количеству инстансов приложения. Напомню, что кафка даёт гарантию, что одно и то же сообщение попадет в одну и ту же партицию, за счёт этого мы гарантированно не встретим дубликатов документов на разных инстансах приложения.

Теперь всё, что нам нужно — достать документы с переданными айдишниками и запустить для них обработку. Важно отметить, что некоторые детали здесь опущены, например: мы не проговорили про необходимость dead letter queue. А итоговое решение потребует несколько больших доработок — по сравнению с тем, что сейчас озвучено. Но ключевой механизм будет работать в той форме, которую мы описали.

Снимок экрана 2025-03-29 в 16.31.48.png

Итого

Плюсы:

  • Масштабируемость

  • Высокая отказоустойчивость

Минусы:

  • Дорогой вариант по времени внедрения

Выводы

Каждое из рассмотренных решений имеет право на жизнь. Всё зависит от того, в какой стадии находится ваш проект, и ваших целей относительно его. Надеюсь, что этот материал поможет вам избежать распространённых ловушек при реализации распределённой обработки и даст конкретные инструменты для борьбы с конкурентной обработкой. Спасибо!

Комментарии (0)