Всем привет! Меня зовут Дмитрий, я руковожу командой государственных интеграций в Ozon Банке. Сегодня я расскажу о том, как мы столкнулись с проблемой гонок при батчевой обработке данных в распределённой системе — и какие решения мы рассматривали, чтобы эту проблему решить. Материал основан на реальном кейсе и будет интересен всем, кто работает с PostgreSQL, батчами, распределёнными системами и борьбой за консистентность в высоконагруженных системах.
Предыстория
Летом 2024 года к нашей команде перешёл только что запущенный сервис, отвечающий за обработку и обмен документами, требующими строгого соответствия регламентам государственных структур. Особенность сервиса — он работает на Windows ВМ. Причина в том, что для выполнения ключевых операций используется проприетарная утилита одного из госорганов, доступная только на Windows. У нас было 4 инстанса приложения.

Архитектурно система строится на PostgreSQL и наборе воркеров, которые в фоновом режиме выполняют батчевую обработку документов:
Из базы выбираются записи, удовлетворяющие определённым условиям.
Эти записи обрабатываются (стоит сказать, что обработка весьма затратная, и время её работы измеряется в минутах).
Их статус обновляется.

Очевидно, что мы хотим по максимуму распараллелить обработку документов и для этого после этапа (1) выборки из бд, мы блокируем документы, чтобы избежать их параллельной обработки сразу на нескольких инстансах приложения.
Блокировка была реализована с помощью специальной таблицы json_locks, в которую заносились идентификаторы документов, взятых в работу.
Структура таблицы json_locks:
И псевдо-SQL, который отражает логику работы этапа выборки и блокировки документов:
BEGIN;
SELECT * FROM documents WHERE status = 'new' LIMIT batch_size;
INSERT INTO json_locks(id, locked_keys, locked_untill)
VALUES (..., ARRAY[ids], ...);
COMMIT;

С точки зрения прозрачности решение отличное — мы могли посмотреть, когда и какие документы были взяты в работу. Но с ростом нагрузки появилась проблема.
Аномалии в PostgreSQL
Вы наверняка знаете, что в PostgreSQL есть ряд аномалий, проявляющихся на разных уровнях изоляции: грязное чтение, неповторяющееся чтение, чтение фантомов и потерянное обновление. Но правда в том, что есть ещё набор аномалий, про которые говорят гораздо реже: сдвиг чтения, сдвиг записи, потерянная вставка и проблема сериализуемости. С одной из них мы и столкнулись.

Допустим, у нас 2 инстанса приложения. На первом происходит выборка документов в момент t1. Через доли секунды второй инстанс делает то же самое. Поскольку в таблице json_locks на этот момент ещё нет записей о текущих документах, оба запроса могут вернуть пересекающиеся множества ID.

Далее оба инстанса делают вставку в json_locks. PostgreSQL (в уровне изоляции READ COMMITTED) не видит конфликта — транзакции проходят успешно, и один и тот же документ может быть обработан одновременно двумя воркерами.
PostgreSQL не блокирует вставку, потому что операции касаются разных строк, и на уровне модели данных конфликта нет. Но логически — он есть.
Какие это можно починить?
Мы системно проанализировали все возможные подходы. Чтобы не упускать детали, ниже описаны 6 стратегий, которые мы рассмотрели или протестировали.
1. Повышение уровня изоляции транзакций
Самое очевидное: если сдвиг записи (write skew) возникает при READ COMMITTED, может, стоит просто использовать SERIALIZABLE, на котором он не воспроизводится?

Здесь у нас есть 2 опции:
(а) повысить уровень изоляции во всей системе
(б) или повысить уровень изоляции только для тех транзакций, в которых эта проблема может появляться
(а) Увеличение уровня изоляции приводит к снижению производительности — насколько сильное, зависит от системы, в которой проводится эта операция. И чтобы ответить на вопрос, «норм ли нам будет с такой нагрузкой», придётся провести нагрузочное тестирование — а это операция не бесплатная.
(б) Опять же изменения простые. В BEGIN TRANSACTION достаточно добавить небольшую приписку с указанием уровня изоляции. Но и тут есть подводные камни: если одни части кода используют READ COMMITTED, а другие SERIALIZABLE, это может привести к неожиданным блокировкам или конфликтам, которые к тому же будет непросто отдебажить
Итого
Плюсы:
Простота реализации
PostgreSQL гарантирует отсутствие write skew
Минусы:
Потенциальное падение производительности при высоких нагрузках
Возможны serialization failures, которые нужно обрабатывать в коде
Разные уровни изоляции в разных частях кода могут привести к блокировкам
2. Advisory Lock
Advisory locks — это механизм, предоставляющий пользователям возможность реализовывать собственную систему блокировок, не связанную напрямую с блокировками на уровне таблиц или строк в БД. Это удобный способ синхронизации доступа к ресурсам как между транзакциями, так и между разными сессиями. Выглядит это примерно так:
SELECT pg_advisory_xact_lock(произвольный_числовой_ключ);
Advisory Lock устанавливается на произвольный числовой ключ, по которому и происходит блокировка. В PostgreSQL Advisory Locks делятся на 2 уровня:
Session-level — держатся до конца сессии, даже если транзакция завершилась
Transaction-level — автоматически освобождаются в конце транзакции
Также бывает 2 типа локов:
Exclusive — только один процесс; может держать лок по ключу
Shared — несколько процессов могут одновременно удерживать shared-лок по одному ключу, но если кто-то попытается взять exclusive — он будет ждать, пока все shared не отпустят.
Механизм похож на RWMutex в Go: RLock — это shared лок, Lock — exclusive лок.
И, наконец, у каждого лока есть 3 способа взаимодействия:
Acquire — получить лок, подождать, если занят.
Try — попытаться получить лок без ожидания.
Release — явно освободить лок.
В итоге суть решения — добавить строку вида pg_advisory_lock(hash(worker_name)) на этапе выборки документов.
Но в работе с advisory локами стоит быть аккуратными: если вы всё же решаете использовать session-level advisory локи, то важно следить за тем, чтобы они не выедали пул коннектов. Это может произойти, если случайно где-то забыли сделать release на коннект.
А если некорректно настроить таймаут, система может где-то простаивать из-за действующей блокировки. Что тоже негативно скажется на пропускной способности.
Итого
Плюсы:
Простота реализации
Минусы:
Если использовать session-level lock, возможны утечки коннектов
Сложнее отлаживать проблемы
3. Rocket Lock
Внутри Ozon Банка существует часто используемый подход к реализации распределенных блокировок, с помощью отдельного механизма Rocket Lock.
Решение основано на специальной таблице из 2 колонок: строковый ключ блокировки и поле locked_until. В последнем отражено время, до которого действует блокировка по данному ключу.

Идея сильно похожа на advisory локи, но информация о блокировках хранится не под капотом бд. А в обычной табличке, с которой всем удобно и привычно работать — и по необходимости кастомить.
Псевдокод метода работы с этим решением:

При попытке захватить лок:
если запись есть и locked_until > now() — выкидываем зарезервированную ошибку, по которой мы понимаем, что лок уже взят другим процессом,
если locked_until истёк — обновляем; за счёт этого обеспечивается переиспользуемость ключей.
Изменения, которые нужно внести — заменить транзакцию из этапа выборки и блокировки данных на Rocket Lock. Ну и реализовать методы по работе с новой таблицей
Итого
Плюсы:
Проблемы легко диагностировать
Минусы:
Более высокая нагрузка на БД в зависимости от сценария использования
Нужна создать доп. таблицу и продумать логику работы с ней
4. Нормализация json_locks
Наверняка, глядя на структуру таблицы json_locks, вы подумали: «Вот бы можно было на колонку с айдишниками просто накинуть индекс уникальности значений». И данное решение основано как раз на этой мысли)
В таблице json_locks для блокировки можно использовать не 1 строку с массивом айдишников, а несколько строк с одним айдишником в каждой. А на столбец с айди блокируемого документа настроить индекс на уникальности. Так мы гарантируем, что ни один документ не попадёт в обработку сразу несколькими инстансами приложения — при вставке в json_locks выпадет ошибка.
Новая схема таблицы json_locks выглядит так:

В части выборки данных для обработки ключевое изменение — это то, что теперь нам не нужна транзакция. Уникальность данных обеспечивается по constraint в бд. Изменения произойдут и в методе вставки в таблицу json_locks — теперь там будет вставка множества строк, а не одной, как раньше.
Таким образом, логика становится более прозрачной и тестируемой. Заодно исчезает необходимость в использовании транзакции: выборку документов и создание блокировки в json_locks можно выполнять без неё.
Но в то же время появляются проблемы с производительностью — в зависимости от размера выбираемых батчей и параллельно запущенных инстансов приложения. А также уменьшится пропускная способность системы, поскольку неизбежно батчи будут неполными — обработка будет происходить только по новым записям в json_locks. Кроме того, решение предполагает изменение структуры БД, что автоматически делает его недешёвым в реализации.
Итого
Плюсы:
Гарантия на корректность работы на уровне БД
Упрощение логики выборки — транзакция не нужна
Минусы:
Сложность миграции
Возможные проблемы с производительностью при больших батчах
5. Промежуточный статус
Это решение основано уже не на изменениях в схеме бд или подходах к работе с ней, а на изменениях в приложении. Если добавить промежуточный статус для документов, выбранных к обработке — in_progress_or_something_like_this, и использовать UPDATE ... SET status=’in_progress_or_something_like_this’ WHERE id IN (SELECT ... FOR UPDATE SKIP LOCKED) RETURNING ...;, то мы выберем и заблокируем все документы к обработке без всяких дополнительных транзакций и таблиц блокировки!
Но есть у такого решения и минусы. Что будет, если обработка завершится неуспешно после этапа выборки данных и их перевода в промежуточный статус? Потребуется дополнительная логика «добивки», которая будет переводить застрявшие в промежуточном статусе данные.
Второй минус — риск переусложнения статусной модели за счёт добавления избыточного количества статусов. Статус придётся добавлять на каждый этап обработки, а суммарно их будет столько же, сколько разных воркеров для обработки в пайплайне.
И третья из возможных проблем: за счёт skip locked возможны ситуации, когда фактически выбранные для обработки батчи меньше ожидаемых размеров, что уменьшает пропускную способность системы.
Итого
Плюсы:
Простота и прозрачность
Нет необходимости во вспомогательных таблицах
Минусы:
Нужна логика "добивки" застрявших документов
Статусная модель расширяется
Возможны неполные батчи
6. Kafka
Теперь попробуем выйти за рамки нашего приложения и подумать над решением, которое влияет на инфраструктуру проекта. Для того, чтобы охватить планируемые изменения, нам нужно будет выйти за пределы этого локального блока с выборкой данных. А также посмотреть на ту часть, где документы переводят в статус, требуемый в нашей выборке. Теперь здесь мы будем не только обновлять статус документов, но и отправлять сообщение в кафку.

И у нас появляется знакомая многим проблема. Что если наше приложение по какой-то причине завершит работу после того, как мы запродюсили сообщение в кафку, но до того, как статус в бд обновился?
Здесь нам на помощь приходит популярный паттерн transactional outbox, чтобы сообщения не терялись. Идея состоит в том, чтобы не сразу продюсить сообщение, а сначала сделать запись в специальной табличке outbox. И уже из неё специальный бэкграунд воркер вычитает и отправит сообщения.
Сам паттерн transactional outbox называется так, потому что запись в отдельную таблицу можно проводить в транзакции, как и другие SQL-запросы. Одновременно с обновлениями статусов или иными действиями, которые могут проходить вместе с отправкой нотификации в кафку.

И сообщения, которые мы в кафку записали, где-то потребуется вычитывать — причём делать это батчами. А из нужной партиции кафки, их число необходимо сделать кратным количеству инстансов приложения. Напомню, что кафка даёт гарантию, что одно и то же сообщение попадет в одну и ту же партицию, за счёт этого мы гарантированно не встретим дубликатов документов на разных инстансах приложения.
Теперь всё, что нам нужно — достать документы с переданными айдишниками и запустить для них обработку. Важно отметить, что некоторые детали здесь опущены, например: мы не проговорили про необходимость dead letter queue. А итоговое решение потребует несколько больших доработок — по сравнению с тем, что сейчас озвучено. Но ключевой механизм будет работать в той форме, которую мы описали.

Итого
Плюсы:
Масштабируемость
Высокая отказоустойчивость
Минусы:
Дорогой вариант по времени внедрения
Выводы
Каждое из рассмотренных решений имеет право на жизнь. Всё зависит от того, в какой стадии находится ваш проект, и ваших целей относительно его. Надеюсь, что этот материал поможет вам избежать распространённых ловушек при реализации распределённой обработки и даст конкретные инструменты для борьбы с конкурентной обработкой. Спасибо!