Код в статье иллюстративный, показывает архитектурные решения и объясняет почему именно так. Не предназначен для copy-paste в прод без адаптации под вашу инфраструктуру, мониторинг и требования.

Думал, зайду в крипту и буду просто дёргать API блокчейна. Не вышло.

Захожу в проект. Стек: FastAPI, PostgreSQL, Redis как Celery broker, Celery workers, Docker, Web3. Стартап на хайпе, деньги реальные, архитектура собрана на коленке. Смотрю на архитектуру платёжного процессинга и первая мысль: ребята, вы серьёзно? Финансовые операции с реальными деньгами, без idempotency вообще, Redis как брокер без persistence, Web3.py синхронные вызовы внутри Celery тасков.

Разговор был короткий: задача такая, чини что есть. Сроки горели.

Что было сломано

Первый месяц прода. Пользователь пишет в поддержку: зачислили дважды, вывел двойную сумму. Открываю логи, чисто. Два идентичных события, оба 200, разница четыре секунды. Оба обработаны. Пользователь получил двойной баланс.

Ежедневная сверка с on-chain данными показала расхождение: несколько аккаунтов с балансом больше чем должно быть по confirmed транзакциям. За первый месяц нашли 23 дублирующих зачисления на ~180k транзакций, около 0.013% error rate. 23 двойных зачисления за месяц. Живые деньги, не метрика.

Первое, что вылезло: дубли от провайдера. Alchemy, Infura и все остальные блокчейн-провайдеры работают по at-least-once delivery. При сетевом сбое, рестарте, под нагрузкой провайдер повторяет доставку. Провайдер так и написал в доках. Это не баг, это условия игры. Провайдер повторяет доставку, твой код должен это переживать без последствий. Наш не переживал.

Дальше хуже. Два параллельных запроса на вывод читали баланс одновременно, оба видели достаточно средств, оба проходили валидацию. Два запроса читают баланс одновременно, оба видят что денег хватает, оба списывают. Школьная гонка.

async def withdraw(conn, user_id: int, amount: Decimal):
    balance = await conn.fetchval(
        "SELECT balance FROM users WHERE id = $1", user_id
    )
    if balance >= amount:
        await conn.execute(
            "UPDATE users SET balance = balance - $1 WHERE id = $2",
            amount, user_id
        )

Дальше. Celery с дефолтными настройками подтверждает задачу брокеру в момент получения. Воркер падает в середине обработки, событие подтверждено, до записи в БД не дошло. Никакого retry, никакого DLQ. Воркер упал, задача подтверждена, деньги не пришли. Пользователь ждёт и не понимает что случилось.

И отдельный тихий убийца: amount сериализуется в JSON через Celery broker как float. Decimal("50.1") превращается в JSON float, то есть в 50.099999999999994. На масштабе это копится в реальный убыток. Никто не заметил, пока не посчитали.

Последнее: прямой вызов .delay() из webhook handler создаёт окно между записью в БД и постановкой в очередь. Если процесс упал в этот момент, событие зависнет в pending без автоматического восстановления.

Итого пять проблем. Начал чинить.

Первый инстинкт: Redis distributed lock

SET NX EX на user_id. Паттерн описан у Antirez, реализован за 20 минут. Не взлетело.

Вот конкретный сценарий, который вскрылся в логах. Воркер берёт лок в Redis. Начинает транзакцию в PostgreSQL. Между этими двумя операциями OOM killer убивает процесс. PostgreSQL транзакция откатилась автоматически, баланс не изменился. Redis лок висит 30 секунд до TTL. Через 30 секунд следующий воркер берёт лок, видит что idempotency_key не записан (записать было некому, транзакция откатилась) и обрабатывает событие заново. Двойное зачисление. В логах оба воркера чистые.

Проблема не в размере TTL. Проблема в отсутствии cross-system атомарности между Redis и PostgreSQL. Redis не подходит, нет атомарности с PostgreSQL. Проверка в коде тоже не работает, два воркера оба пройдут SELECT до INSERT. Единственное что атомарно по определению unique constraint. С деньгами нет "почти правильно".

Схема базы данных

CREATE TABLE payment_events (
    event_id     TEXT PRIMARY KEY,
    user_id      INTEGER NOT NULL REFERENCES users(id),
    amount       NUMERIC(38, 18) NOT NULL,
    event_type   TEXT NOT NULL,
    status       TEXT NOT NULL DEFAULT 'pending',
    retry_count  INTEGER NOT NULL DEFAULT 0,
    created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    CONSTRAINT valid_status CHECK (
        status IN ('pending', 'enqueued', 'processing', 'confirmed', 'failed')
    )
);

CREATE TABLE balance_events (
    id              BIGSERIAL PRIMARY KEY,
    user_id         INTEGER NOT NULL REFERENCES users(id),
    amount          NUMERIC(38, 18) NOT NULL,
    event_type      TEXT NOT NULL,
    source_event_id TEXT NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    CONSTRAINT uq_balance_events_source UNIQUE (source_event_id, event_type)
);

CREATE TABLE processed_events (
    idempotency_key TEXT PRIMARY KEY,
    outcome         TEXT NOT NULL DEFAULT 'pending',
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE dead_letter_queue (
    id         BIGSERIAL PRIMARY KEY,
    event_id   TEXT NOT NULL,
    event_type TEXT NOT NULL,
    user_id    INTEGER NOT NULL,
    amount     NUMERIC(38, 18) NOT NULL,
    error      TEXT NOT NULL,
    attempt    INTEGER NOT NULL DEFAULT 1,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

ALTER TABLE users
    ADD COLUMN IF NOT EXISTS initial_balance NUMERIC(38, 18) NOT NULL DEFAULT 0,
    ADD CONSTRAINT balance_non_negative CHECK (balance >= 0);

CREATE INDEX idx_payment_events_pending
    ON payment_events (updated_at, created_at)
    WHERE status = 'pending';

CREATE INDEX idx_payment_events_enqueued
    ON payment_events (updated_at)
    WHERE status = 'enqueued';

CREATE INDEX idx_payment_events_processing
    ON payment_events (updated_at)
    WHERE status = 'processing';

CREATE INDEX idx_balance_events_user_id
    ON balance_events (user_id);

CREATE INDEX idx_balance_events_created_at
    ON balance_events (created_at DESC);

CREATE INDEX idx_processed_events_created
    ON processed_events (created_at);

CREATE INDEX idx_processed_events_pending_stale
    ON processed_events (created_at)
    WHERE outcome = 'pending';

CREATE INDEX idx_dlq_event_id
    ON dead_letter_queue (event_id);

CREATE INDEX idx_dlq_created_at
    ON dead_letter_queue (created_at DESC);

В схеме три неочевидных решения.

NUMERIC(38, 18), не NUMERIC(20, 8). Колонка amount хранится в ETH, не в wei. Webhook-провайдер присылает уже сконвертированное значение. Если ваш провайдер возвращает wei, конвертируйте на входе: amount_eth = Decimal(wei_str) / Decimal(10**18) до передачи в _validate_amount . ERC-20 сами объявляют decimals(): USDC/USDT - 6, WBTC - 8, DAI/WETH/MKR - 18. ETH в wei тоже 10^18. NUMERIC(20, 8) выдержит USDC/USDT, но физически не вмещает 18-decimal токены, поэтому берём worst case, NUMERIC(38, 18).

initial_balance нужна для сверки. При миграции заполняете её текущим балансом, UPDATE users SET initial_balance = balance WHERE <условие>. Это означает что balance_events начинают наполняться с нуля, и hot_path_balance_check корректно считает только тех пользователей, у которых все операции прошли через balance_events. Для новых систем initial_balance остаётся 0.

Отдельные индексы для pending/enqueued/processing вместо одного status IN (...), так как poller'ы используют разные паттерны доступа. idx_payment_events_pending, partial index с (updated_at, created_at) для ORDER BY created_at в enqueue_pending_events, иначе планировщик сортирует без индекса.

retry_count в payment_events добавлен для предотвращения бесконечного цикла pending -> enqueued при durable outage Redis, об этом подробнее в секции про деградацию.

Как это починилось

Инициализация

import os
import uuid
import json
import hmac
import random
import hashlib
import secrets
import threading
import structlog
import psycopg2
import psycopg2.extras
import psycopg2.pool
import redis as redis_lib
from typing import Literal, Optional
import re
from decimal import Decimal, InvalidOperation
from contextvars import ContextVar
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from celery import shared_task
from celery.exceptions import Ignore, MaxRetriesExceededError

logger = structlog.get_logger()

@dataclass(frozen=True)
class Settings:
    DATABASE_URL:   str
    WEBHOOK_SECRET: str
    ETH_RPC_URL:    str
    ALERT_EMAIL:    str
    REDIS_URL:      str = "redis://localhost:6379/0"

    @classmethod
    def from_env(cls) -> "Settings":
        required = ("DATABASE_URL", "WEBHOOK_SECRET", "ETH_RPC_URL", "ALERT_EMAIL")
        missing  = [k for k in required if not os.environ.get(k)]
        if missing:
            raise RuntimeError(f"Missing required env vars: {', '.join(missing)}")
        return cls(
            DATABASE_URL   = os.environ["DATABASE_URL"],
            WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"],
            ETH_RPC_URL    = os.environ["ETH_RPC_URL"],
            ALERT_EMAIL    = os.environ["ALERT_EMAIL"],
            REDIS_URL      = os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
        )

settings = Settings.from_env()

_redis_client = redis_lib.Redis.from_url(
    settings.REDIS_URL,
    decode_responses=True,
    socket_connect_timeout=2,
    socket_timeout=2,
    retry_on_timeout=False,
)

send_alert: rate-limited обёртка над логгером. В проде заменяется на PagerDuty/OpsGenie SDK. Одинаковые alert_key внутри cooldown окна подавляются. Если ключ не задан, отправка без rate-limit, для одноразовых критичных алертов. Не бросает исключений никогда.

_alert_last_sent растёт при уникальных ключах. Если генерировать ключи per-event-id (а мы так и делаем для orphan-алертов), за месяц это несколько миллионов записей. Поэтому при переполнении сначала чистим устаревшие ключи, а если после чистки место всё равно не освободилось, подавляем новые. Костыль, да. Но за восемь месяцев не отвалился.

_alert_lock = threading.Lock()
_alert_last_sent: dict = {}
MAX_ALERT_KEYS = 1_000

def send_alert(message: str, alert_key: Optional[str] = None,
               cooldown_seconds: int = 300) -> None:
    try:
        if alert_key is None:
            logger.critical("ALERT", message=message)
            return
        with _alert_lock:
            now = datetime.now(timezone.utc)
            if len(_alert_last_sent) >= MAX_ALERT_KEYS:
                stale_cutoff = now - timedelta(seconds=cooldown_seconds * 2)
                stale = [k for k, v in _alert_last_sent.items() if v < stale_cutoff]
                for k in stale:
                    del _alert_last_sent[k]
                if len(_alert_last_sent) >= MAX_ALERT_KEYS and alert_key not in _alert_last_sent:
                    logger.warning("send_alert suppressed: rate limit dict full",
                                   alert_key=alert_key)
                    return
            last = _alert_last_sent.get(alert_key)
            if last and (now - last).total_seconds() < cooldown_seconds:
                return
            _alert_last_sent[alert_key] = now
        logger.critical("ALERT", message=message, alert_key=alert_key)
    except Exception as e:
        logger.error("send_alert failed", error=str(e))

class ImproperlyConfigured(RuntimeError):
    pass

Trace ID через всю цепочку

Каждый воркер получает свой ContextVar автоматически, шарить его между потоками невозможно.

_trace_id: ContextVar[str] = ContextVar('trace_id', default='')

def get_trace_id() -> str:
    return _trace_id.get() or 'no-trace'

def set_trace_id(tid: str) -> None:
    _trace_id.set(tid)

def new_trace_id() -> str:
    tid = str(uuid.uuid4())
    _trace_id.set(tid)
    return tid

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        lambda _, __, event_dict: {**event_dict, "trace_id": get_trace_id()},
        structlog.processors.JSONRenderer(),
    ]
)

Idempotency key через DB unique constraint

Ключ строится из event_id и event_type, пишется в отдельную таблицу с unique constraint в той же транзакции что и изменение баланса.

Redis не подходит, нет атомарности с PostgreSQL. Проверка в коде тоже не работает, два воркера оба пройдут SELECT до INSERT. Единственное, что атомарно по определению: unique constraint.

Сначала делал конкатенацию f"{event_id}::{event_type}". Словил коллизию при :: в event_id. Попробовал NUL-разделитель: f"{event_id}\0{event_type}".encode(). Тоже коллизия: _idempotency_key("a\x00b", "c") == _idempotency_key("a", "b\x00c"), оба дают байты b"a\x00b\x00c". Финальный вариант, length-prefix encoding: каждое поле предваряется 4-байтовой длиной, коллизия между полями принципиально невозможна.

class RetryableError(Exception):
    pass

class AlreadyProcessedError(Exception):
    pass

MAX_AMOUNT = Decimal("10") ** 20

_AMOUNT_RE = re.compile(r"^[0-9]+(\.[0-9]+)?([eE][+-]?[0-9]+)?$")

def _validate_amount(amount) -> Decimal:
    if isinstance(amount, float):
        raise ValueError(
            f"float не допускается — передавайте amount как str из JSON payload. "
            f"Получено: {amount!r}"
        )
    if isinstance(amount, str) and amount != amount.strip():
        raise ValueError(
            f"amount содержит whitespace: {amount!r}. "
            f"Передавайте amount без пробелов."
        )
    if isinstance(amount, str) and not _AMOUNT_RE.fullmatch(amount):
        raise ValueError(f"invalid amount format: {amount!r}")
    try:
        amount_decimal = Decimal(str(amount))
        if not amount_decimal.is_finite():
            raise ValueError(f"amount must be finite, got {amount_decimal}")
        if amount_decimal <= 0:
            raise ValueError(f"amount must be positive, got {amount_decimal}")
        if amount_decimal.normalize().as_tuple().exponent < -18:
            raise ValueError(f"amount precision exceeds 18 decimals: {amount_decimal}")
        if amount_decimal >= MAX_AMOUNT:
            raise ValueError(
                f"amount exceeds NUMERIC(38,18) capacity: {amount_decimal} >= 10^20"
            )
        return amount_decimal
    except InvalidOperation:
        raise ValueError(f"invalid amount format: {amount!r}")

def _idempotency_key(event_id: str, event_type: str) -> str:
    a = event_id.encode("utf-8")
    b = event_type.encode("utf-8")
    payload = (
        len(a).to_bytes(4, "big") + a +
        len(b).to_bytes(4, "big") + b
    )
    return hashlib.sha256(payload).hexdigest()

Два места в первой версии были сломаны на граничных случаях: MAX_AMOUNT = Decimal("10")**20 - 1 отвергала валидную сумму, а 50.0000000000000000000 уходило в отказ как exponent=-19 хотя значащих цифр там нет. Починено: 10**20 без -1, и normalize() перед проверкой exponent. На граничные значения своих валидаторов стоит писать тесты, узнаёшь интересное.

Ещё один сюрприз из той же серии: _validate_amount("+50.1"),_validate_amount("1_000") и _validate_amount("١٢٣") все возвращают корректный Decimal. Python толерантен к underscore-нотации, leading + и арабо-индийским цифрам. Для финансового валидатора это нежелательное поведение, на входе ожидается строго [цифры].[цифры]. Добавлен regex ^[0-9]+(\.[0-9]+)?([eE][+-]?[0-9]+)?$ перед Decimal(), отклоняет всё нестандартное.

FSM переходов, одна точка правды

Статус события - детерминированный конечный автомат. Сначала было три места с raw UPDATE payment_events SET status = .... Это нарушало инвариант FSM.

Переходы: pending идёт в enqueued через poller. Enqueued в processing когда воркер взял задачу. Из processing только в confirmed или failed. Отдельный edge, enqueued сразу в confirmed, нужен для replay path: когда processed_events уже содержит outcome=success, но воркер упал после того как записал это и до того как успел перевести payment_events в confirmed. Без этого edge события зависали бы в enqueued вечно.

VALID_TRANSITIONS: dict[str, set[str]] = {
    "pending":    {"enqueued", "failed"},
    "enqueued":   {"processing", "failed", "pending", "confirmed"},
    "processing": {"confirmed", "failed"},
}

TERMINAL_STATUSES = frozenset({"confirmed", "failed"})

def transition_event_status(cur, event_id, from_status, to_status):
    if to_status not in VALID_TRANSITIONS.get(from_status, set()):
        raise ValueError(f"невалидный переход: {from_status} -> {to_status}")

    cur.execute(
        "UPDATE payment_events SET status = %s, updated_at = NOW() "
        "WHERE event_id = %s AND status = %s",
        (to_status, event_id, from_status),
    )

    if cur.rowcount == 0:
        cur.execute("SELECT status FROM payment_events WHERE event_id = %s", (event_id,))
        actual = cur.fetchone()
        actual_status = actual["status"] if actual else "not found"

        if actual_status == to_status:
            logger.info("status already set", event_id=event_id, status=to_status)
            return

        if actual_status in TERMINAL_STATUSES:
            raise AlreadyProcessedError(f"event already terminal: {actual_status}")

        raise RetryableError(
            f"concurrent status transition event_id={event_id} "
            f"expected={from_status} actual={actual_status}"
        )

_mark_event_failed: безопасный перевод в failed из любого нетерминального статуса. Коммитит сама, правило одно, статус failed должен лечь в БД несмотря ни на что. Всё остальное потом. Вызывать только на чистом соединении, после rollback.

_mark_event_failed коммитит сама, если будешь рефакторить, это тебя укусит.

def _mark_event_failed(conn, event_id) -> bool:
    try:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute("SET LOCAL lock_timeout = '2s'")
            cur.execute(
                "SELECT status FROM payment_events WHERE event_id = %s FOR UPDATE NOWAIT",
                (event_id,)
            )
            row = cur.fetchone()
            if row is None:
                conn.rollback()
                return False
            current = row["status"]
            if current in TERMINAL_STATUSES:
                conn.rollback()
                return False
            try:
                transition_event_status(cur, event_id, current, "failed")
                conn.commit()
                return True
            except (ValueError, RetryableError, AlreadyProcessedError):
                conn.rollback()
                return False
    except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled):
        try:
            conn.rollback()
        except Exception:
            pass
        return False
    except Exception:
        try:
            conn.rollback()
        except Exception:
            pass
        raise

SELECT FOR UPDATE NOWAIT + lock_timeout

С обычным FOR UPDATE воркер молча ждёт лок, блокируя thread. NOWAIT это убирает.

Миграция с ALTER TABLE блокирует таблицу целиком и при этом lock_timeout = '2s' не даёт воркеру висеть всё это время.

Про коды ошибок: lock_timeout бросает LockNotAvailable (pgcode 55P03), statement_timeout бросает QueryCanceled (pgcode 57014). Путаница приводит к непойманному исключению в продакшене. DeadlockDetected (pgcode 40P01): транзакция убита PostgreSQL из-за цикла блокировок, тоже transient, тоже retryable. PostgreSQL сам выбирает жертву и откатывает её транзакцию, повтор решает проблему. Все три нужно ловить вместе.

Пул соединений с валидацией

Соединение в пуле может быть мёртвым: PostgreSQL закрывает idle-соединения через tcp_keepalives_idle или idle_in_transaction_session_timeout. Без проверки воркер получит разорванный TCP и упадёт с InterfaceError в случайный момент.

_PooledConn: обёртка над соединением, которая знает как вернуть его владельцу. Через putconn() обратно в пул или через close() если соединение создавалось напрямую. putconn() идемпотентен, второй вызов no-op. getattr не проксирует dunder-методы, поэтому _PooledConn нельзя использовать как context manager. Весь код работает через conn.cursor().

get_validated_conn делает три уровня проверки без I/O в нормальном потоке: сначала conn.closed (in-memory флаг), потом conn.status (грязная транзакция из предыдущего использования), и только если статус не STATUS_READY, делает SELECT 1.

class _PooledConn:
    def __init__(self, conn, pool=None):
        self._conn = conn
        self._pool = pool

    def putconn(self, close=False):
        if self._pool is None:
            try:
                self._conn.close()
            except Exception:
                pass
            return
        pool, self._pool = self._pool, None  # idempotent: second call is a no-op
        try:
            pool.putconn(self._conn, close=close)
        except Exception:
            pass

    def __getattr__(self, name):
        return getattr(self._conn, name)

def get_validated_conn(pool: psycopg2.pool.SimpleConnectionPool) -> "_PooledConn":
    try:
        conn = pool.getconn()
    except psycopg2.pool.PoolError as e:
        raise RetryableError(f"DB connection pool exhausted: {e}")

    if conn.closed != 0:
        try:
            pool.putconn(conn, close=True)
        except Exception:
            pass
        direct = psycopg2.connect(dsn=settings.DATABASE_URL)
        return _PooledConn(direct, pool=None)

    if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
        try:
            conn.rollback()
            logger.warning("get_validated_conn: rolled back dirty connection")
        except Exception:
            try:
                pool.putconn(conn, close=True)
            except Exception:
                pass
            direct = psycopg2.connect(dsn=settings.DATABASE_URL)
            return _PooledConn(direct, pool=None)

    if conn.status != psycopg2.extensions.STATUS_READY:
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT 1")
        except Exception:
            try:
                pool.putconn(conn, close=True)
            except Exception:
                pass
            direct = psycopg2.connect(dsn=settings.DATABASE_URL)
            return _PooledConn(direct, pool=None)

    return _PooledConn(conn, pool)

Deposit и withdrawal

При INSERT INTO processed_events два исхода: успех, идём дальше (first-time path). UniqueViolation, событие уже видели (replay path).

На replay path смотрим outcome. Если success, синхронизируем статус payment_events с реальностью. Если pending, другой воркер ещё в середине транзакции, бросаем RetryableError для немедленного retry вместо ожидания recover_stale_enqueued_events через 3 минуты.

retry_count сбрасывается в 0 при успешной обработке. Без этого: событие попало в retry, обработалось успешно с retry_count=7, через 14+ дней processed_events вычистился по TTL, событие пришло снова через reorg compensation. Стартует уже с 7/10 до DLQ вместо 0/10.

Отдельный случай в replay: processed_events говорит что всё ок, а payment_events об этом событии ничего не знает. Такого быть не должно. Логируем, алертим, не паникуем.

def process_deposit_sync(conn, event_id, event_type, user_id, amount):
    amount_decimal  = _validate_amount(amount)
    idempotency_key = _idempotency_key(event_id, event_type)

    with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
        cur.execute("SET LOCAL lock_timeout = '2s'")
        cur.execute("SET LOCAL statement_timeout = '5s'")

        try:
            cur.execute(
                "INSERT INTO processed_events (idempotency_key, outcome) "
                "VALUES (%s, 'pending')",
                (idempotency_key,),
            )
        except psycopg2.errors.UniqueViolation:
            conn.rollback()
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur2:
                cur2.execute("SET LOCAL lock_timeout = '2s'")
                cur2.execute("SET LOCAL statement_timeout = '5s'")
                cur2.execute(
                    "SELECT outcome FROM processed_events WHERE idempotency_key = %s",
                    (idempotency_key,),
                )
                row = cur2.fetchone()
                outcome = row["outcome"] if row else "pending"

                if outcome == "success":
                    cur2.execute(
                        "SELECT status FROM payment_events WHERE event_id = %s",
                        (event_id,)
                    )
                    r = cur2.fetchone()
                    current = r["status"] if r else None
                    if current is None:
                        logger.error(
                            "deposit_replay: orphan event, processed_events "
                            "exists but payment_event not found",
                            event_id=event_id,
                        )
                        send_alert(
                            f"[CRITICAL] orphan deposit event: {event_id}",
                            alert_key=f"orphan_deposit:{event_id}",
                        )
                    elif current == "enqueued":
                        transition_event_status(cur2, event_id, "enqueued", "confirmed")
                    elif current == "processing":
                        transition_event_status(cur2, event_id, "processing", "confirmed")
                    elif current == "confirmed":
                        pass
                    else:
                        conn.rollback()
                        raise RetryableError(
                            f"deposit_replay FSM violation: "
                            f"payment_event.status={current!r} with outcome=success "
                            f"for event_id={event_id}"
                        )
                    conn.commit()
                    return

            conn.rollback()
            raise RetryableError(
                f"deposit idempotency hit with outcome={outcome!r} for event_id={event_id}"
            )

        except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
                psycopg2.errors.DeadlockDetected) as e:
            conn.rollback()
            raise RetryableError(f"timeout on deposit for user {user_id}: {e}")

        try:
            transition_event_status(cur, event_id, "enqueued", "processing")

            cur.execute("SELECT id FROM users WHERE id = %s", (user_id,))
            if cur.fetchone() is None:
                conn.rollback()
                raise ValueError(f"user {user_id} not found")

            cur.execute(
                "UPDATE users SET balance = balance + %s WHERE id = %s",
                (amount_decimal, user_id),
            )
            if cur.rowcount == 0:
                conn.rollback()
                raise ValueError(f"user {user_id} disappeared between SELECT and UPDATE")
        except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
                psycopg2.errors.DeadlockDetected) as e:
            conn.rollback()
            raise RetryableError(f"lock/timeout on deposit first-time path for user {user_id}: {e}")

        try:
            cur.execute(
                "INSERT INTO balance_events "
                "(user_id, amount, event_type, source_event_id, created_at) "
                "VALUES (%s, %s, %s, %s, NOW())",
                (user_id, amount_decimal, event_type, event_id),
            )
        except psycopg2.errors.UniqueViolation:
            conn.rollback()
            raise Exception(
                f"balance_events duplicate without idempotency key violation "
                f"event_id={event_id}, investigate immediately"
            )

        cur.execute(
            "UPDATE processed_events SET outcome = 'success' WHERE idempotency_key = %s",
            (idempotency_key,),
        )
        cur.execute(
            "UPDATE payment_events SET retry_count = 0 WHERE event_id = %s",
            (event_id,),
        )
        transition_event_status(cur, event_id, "processing", "confirmed")
        conn.commit()

WithdrawalOutcome = Literal["success", "insufficient_funds"]

def process_withdrawal_sync(conn, event_id, event_type, user_id, amount) -> WithdrawalOutcome:
    amount_decimal  = _validate_amount(amount)
    idempotency_key = _idempotency_key(event_id, event_type)

    with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
        cur.execute("SET LOCAL lock_timeout = '2s'")
        cur.execute("SET LOCAL statement_timeout = '5s'")

        try:
            cur.execute(
                "INSERT INTO processed_events (idempotency_key, outcome) "
                "VALUES (%s, 'pending')",
                (idempotency_key,),
            )
        except psycopg2.errors.UniqueViolation:
            conn.rollback()
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as lc:
                lc.execute("SET LOCAL lock_timeout = '2s'")
                lc.execute("SET LOCAL statement_timeout = '5s'")
                lc.execute(
                    "SELECT outcome FROM processed_events WHERE idempotency_key = %s",
                    (idempotency_key,),
                )
                stored = (lc.fetchone() or {"outcome": "pending"})["outcome"]

                if stored == "success":
                    lc.execute(
                        "SELECT status FROM payment_events WHERE event_id = %s",
                        (event_id,)
                    )
                    r = lc.fetchone()
                    current = r["status"] if r else None
                    if current is None:
                        logger.error(
                            "withdrawal_replay: orphan event, processed_events "
                            "exists but payment_event not found",
                            event_id=event_id,
                        )
                        send_alert(
                            f"[CRITICAL] orphan withdrawal event: {event_id}",
                            alert_key=f"orphan_withdrawal:{event_id}",
                        )
                    elif current == "enqueued":
                        transition_event_status(lc, event_id, "enqueued", "confirmed")
                    elif current == "processing":
                        transition_event_status(lc, event_id, "processing", "confirmed")
                    elif current == "confirmed":
                        pass
                    else:
                        conn.rollback()
                        raise RetryableError(
                            f"withdrawal_replay FSM violation: "
                            f"payment_event.status={current!r} with outcome=success "
                            f"for event_id={event_id}"
                        )
                    conn.commit()
                    return "success"
                elif stored == "insufficient_funds":
                    lc.execute(
                        "SELECT status FROM payment_events WHERE event_id = %s",
                        (event_id,)
                    )
                    r = lc.fetchone()
                    current = r["status"] if r else None
                    if current == "enqueued":
                        transition_event_status(lc, event_id, "enqueued", "failed")
                    elif current == "processing":
                        transition_event_status(lc, event_id, "processing", "failed")
                    elif current is None:
                        logger.error(
                            "withdrawal_replay insufficient_funds: orphan event",
                            event_id=event_id,
                        )
                        send_alert(
                            f"[CRITICAL] orphan withdrawal (insufficient_funds): {event_id}",
                            alert_key=f"orphan_withdrawal_insuf:{event_id}",
                        )
                    conn.commit()
                    return "insufficient_funds"
                else:
                    conn.rollback()
                    raise RetryableError(
                        f"withdrawal idempotency hit with outcome={stored!r} "
                        f"for event_id={event_id}"
                    )

        except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
                psycopg2.errors.DeadlockDetected) as e:
            conn.rollback()
            raise RetryableError(f"lock/timeout on withdrawal INSERT for user {user_id}: {e}")

        try:
            cur.execute(
                "SELECT balance FROM users WHERE id = %s FOR UPDATE NOWAIT",
                (user_id,),
            )
        except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
                psycopg2.errors.DeadlockDetected) as e:
            conn.rollback()
            raise RetryableError(f"lock/timeout/deadlock on user lock for user {user_id}: {e}")

        row = cur.fetchone()
        if row is None:
            conn.rollback()
            raise ValueError(f"user {user_id} not found")

        try:
            if row["balance"] < amount_decimal:
                cur.execute(
                    "UPDATE processed_events SET outcome = 'insufficient_funds' "
                    "WHERE idempotency_key = %s",
                    (idempotency_key,),
                )
                transition_event_status(cur, event_id, "enqueued", "failed")
                conn.commit()
                return "insufficient_funds"

            transition_event_status(cur, event_id, "enqueued", "processing")

            cur.execute(
                "UPDATE users SET balance = balance - %s WHERE id = %s",
                (amount_decimal, user_id),
            )

            try:
                cur.execute(
                    "INSERT INTO balance_events "
                    "(user_id, amount, event_type, source_event_id, created_at) "
                    "VALUES (%s, %s, %s, %s, NOW())",
                    (user_id, -amount_decimal, event_type, event_id),
                )
            except psycopg2.errors.UniqueViolation:
                conn.rollback()
                raise Exception(
                    f"balance_events duplicate without idempotency key violation "
                    f"event_id={event_id}, investigate immediately"
                )

            cur.execute(
                "UPDATE processed_events SET outcome = 'success' WHERE idempotency_key = %s",
                (idempotency_key,),
            )
            cur.execute(
                "UPDATE payment_events SET retry_count = 0 WHERE event_id = %s",
                (event_id,),
            )
            transition_event_status(cur, event_id, "processing", "confirmed")
            conn.commit()
            return "success"
        except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
                psycopg2.errors.DeadlockDetected) as e:
            conn.rollback()
            raise RetryableError(f"lock/timeout on withdrawal path for user {user_id}: {e}")

Webhook: outbox pattern вместо прямого .delay()

Прямой вызов .delay() из webhook handler создаёт окно между записью в БД и постановкой в очередь. Если процесс упал в этот момент, событие зависнет в pending навсегда.

Решение: webhook только пишет в БД. Отдельный poller каждые 5 секунд берёт pending события, сначала атомарно меняет статусы и коммитит, и только после этого ставит в Celery очередь. Сначала коммитим, потом кладём в очередь, иначе воркер стартует раньше чем БД знает об enqueued.

Alchemy и Infura знают только tx-hash и адрес получателя. Маппинг to_address в user_id делается отдельным запросом к таблице deposit_addresses. Этот слой вынесен из статьи, но без него злоумышленник с HMAC-ключом зачислит деньги на любой user_id. Стоит держать это в голове.

verify_webhook_signature принимает raw_body байтами до JSON-парсинга, подпись считается по исходным байтам. secrets.compare_digest защищает от timing attack.

import asyncpg
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI()

limiter = Limiter(key_func=get_remote_address, storage_uri=settings.REDIS_URL)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

ALLOWED_EVENT_TYPES = frozenset({"deposit", "airdrop", "withdrawal", "withdrawal_fee"})

def verify_webhook_signature(raw_body, signature_header, signing_key):
    if not signing_key:
        raise ImproperlyConfigured("WEBHOOK_SECRET is not set")
    if len(signing_key) < 32:
        raise ImproperlyConfigured(
            f"WEBHOOK_SECRET too short: {len(signing_key)} chars, minimum 32"
        )
    if not signature_header:
        return False
    mac = hmac.new(
        key=signing_key.encode("utf-8"),
        msg=raw_body,
        digestmod=hashlib.sha256,
    )
    return secrets.compare_digest(mac.hexdigest(), signature_header)

@app.post("/webhook/payments")
@limiter.limit("300/minute")
@limiter.limit("30/second")
async def payment_webhook(request: Request):
    raw_body  = await request.body()
    signature = request.headers.get("X-Alchemy-Signature", "")

    if not verify_webhook_signature(raw_body, signature, settings.WEBHOOK_SECRET):
        raise HTTPException(status_code=401, detail="invalid signature")

    trace_id = (
        request.headers.get("X-Request-ID")
        or request.headers.get("X-Alchemy-Request-ID")
        or new_trace_id()
    )
    set_trace_id(trace_id)

    try:
        payload    = json.loads(raw_body)
        event_id   = payload["event_id"]
        event_type = payload["event_type"]
        user_id    = payload["user_id"]
        if not isinstance(payload.get("amount"), str):
            raise HTTPException(status_code=400, detail="amount must be a JSON string, not a number")
        amount_str = payload["amount"]
    except (json.JSONDecodeError, KeyError) as e:
        raise HTTPException(status_code=400, detail=f"invalid payload: {e}")

    if event_type not in ALLOWED_EVENT_TYPES:
        raise HTTPException(status_code=400, detail=f"unknown event_type: {event_type!r}")

    try:
        _validate_amount(amount_str)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=f"invalid amount: {e}")

    db = request.app.state.db
    try:
        async with db.transaction():
            await db.fetchrow(
                "INSERT INTO payment_events (event_id, user_id, amount, event_type, status) "
                "VALUES ($1, $2, $3, $4, 'pending') ON CONFLICT (event_id) DO NOTHING",
                event_id, user_id, amount_str, event_type,
            )
    except asyncpg.exceptions.ForeignKeyViolationError:
        logger.warning(
            "orphan webhook event (user not found)",
            event_id=event_id, user_id=user_id,
        )
        raise HTTPException(status_code=400, detail="user not found")

    return {"status": "accepted", "trace_id": trace_id}

FastAPI работает в async event loop, блокирующий psycopg2 там убьёт throughput, поэтому asyncpg в webhook. Celery workers, отдельные процессы без event loop, asyncpg там только добавит сложность.

enqueue_pending_events использует тот же паттерн SAVEPOINT что и recover_stale_enqueued_events. Без него один event с AlreadyProcessedError от race с recover'ом откатывал весь батч из 100 событий. Они оставались в pending и подхватывались следующим тиком, но это видно в логах как потерянный тик. SAVEPOINT sp_enq на каждый event изолирует ошибку.

@shared_task(name="enqueue_pending_events")
def enqueue_pending_events() -> dict:
    conn = get_validated_conn(db_pool)
    events_to_enqueue = []
    try:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute("""
                SELECT event_id, event_type, user_id, amount
                FROM payment_events
                WHERE status = 'pending'
                  AND updated_at < NOW() - INTERVAL '5 seconds'
                ORDER BY created_at
                LIMIT 100
                FOR UPDATE SKIP LOCKED
            """)
            events = cur.fetchall()
            events_ok = []
            for event in events:
                try:
                    cur.execute("SAVEPOINT sp_enq")
                    transition_event_status(cur, event['event_id'], "pending", "enqueued")
                    cur.execute("RELEASE SAVEPOINT sp_enq")
                    events_ok.append(event)
                except (ValueError, RetryableError, AlreadyProcessedError) as e:
                    cur.execute("ROLLBACK TO SAVEPOINT sp_enq")
                    cur.execute("RELEASE SAVEPOINT sp_enq")
                    logger.warning("enqueue: skipped event",
                                   event_id=event['event_id'], error=str(e))
            conn.commit()
            events_to_enqueue = list(events_ok)
    except Exception:
        try:
            conn.rollback()
        except Exception:
            pass
        logger.exception("enqueue_pending_events: transition failed")
        raise
    finally:
        try:
            conn.putconn()
        except Exception:
            pass

    enqueued = 0
    for event in events_to_enqueue:
        try:
            process_payment_event.apply_async(
                args=[event['event_id'], event['event_type'],
                      event['user_id'], str(event['amount'])],
                kwargs={"trace_id": str(uuid.uuid4())},
            )
            enqueued += 1
        except Exception:
            logger.exception("apply_async failed", event_id=event['event_id'])

    logger.info("enqueue_pending_events done", enqueued=enqueued)
    return {"enqueued": enqueued}

recover_stale_enqueued_events каждые 2 минуты находит события, застрявшие в enqueued. После MAX_RECOVERY_ATTEMPTS попыток переводит в failed + DLQ. SAVEPOINT на каждое событие, ошибка в одном не откатывает весь батч.

MAX_RECOVERY_ATTEMPTS = 10

@shared_task(name="recover_stale_enqueued_events")
def recover_stale_enqueued_events() -> dict:
    conn      = get_validated_conn(db_pool)
    recovered = 0
    dlqed     = 0
    try:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute("""
                SELECT event_id, event_type, user_id, amount, retry_count
                FROM payment_events
                WHERE status = 'enqueued'
                  AND updated_at < NOW() - INTERVAL '3 minutes'
                ORDER BY created_at
                LIMIT 50
                FOR UPDATE SKIP LOCKED
            """)
            stale = cur.fetchall()
            for event in stale:
                try:
                    cur.execute("SAVEPOINT sp_recover")
                    if event['retry_count'] >= MAX_RECOVERY_ATTEMPTS:
                        transition_event_status(cur, event['event_id'], "enqueued", "failed")
                        cur.execute(
                            "INSERT INTO dead_letter_queue "
                            "(event_id, event_type, user_id, amount, error) "
                            "VALUES (%s, %s, %s, %s, %s)",
                            (event['event_id'], event['event_type'], event['user_id'],
                             str(event['amount']),
                             f"exhausted recovery attempts ({MAX_RECOVERY_ATTEMPTS})")
                        )
                        dlqed += 1
                    else:
                        transition_event_status(cur, event['event_id'], "enqueued", "pending")
                        cur.execute(
                            "UPDATE payment_events SET retry_count = retry_count + 1 "
                            "WHERE event_id = %s",
                            (event['event_id'],)
                        )
                        recovered += 1
                    cur.execute("RELEASE SAVEPOINT sp_recover")
                except Exception as sp_exc:
                    try:
                        cur.execute("ROLLBACK TO SAVEPOINT sp_recover")
                        cur.execute("RELEASE SAVEPOINT sp_recover")
                    except Exception:
                        pass
                    logger.error("recover: event skipped due to error",
                                 event_id=event['event_id'], error=str(sp_exc))
            conn.commit()

        if recovered or dlqed:
            logger.warning("recover_stale_enqueued_events",
                          recovered=recovered, dlqed=dlqed)
        if dlqed:
            send_alert(
                f"[WARNING] {dlqed} events exhausted recovery attempts, check DLQ",
                alert_key="recovery_exhausted",
            )
        return {"recovered": recovered, "dlqed": dlqed}
    except Exception:
        try:
            conn.rollback()
        except Exception:
            pass
        logger.exception("recover_stale_enqueued_events failed")
        raise
    finally:
        try:
            conn.putconn()
        except Exception:
            pass

Celery: acks_late + reject_on_worker_lost + пул соединений

acks_late=True: подтверждение брокеру после завершения обработки, не в момент получения. По умолчанию Celery подтверждает сразу: воркер падает в середине, задача потеряна.

reject_on_worker_lost=True: при SIGKILL/OOM задача возвращается в очередь.

Если создать пул до fork, все воркеры наследуют одни и те же файловые дескрипторы. Два процесса шлют запросы в один сокет и ответы перемешиваются, поэтому пул создаётся в worker_process_init.

import os
from celery.signals import worker_process_init

db_pool = None
_local_breaker = None

@worker_process_init.connect
def init_worker(**kwargs):
    global db_pool, _local_breaker

    _local_breaker = _InProcessBreaker()

    worker_pool = os.environ.get("CELERY_WORKER_POOL", "prefork")
    is_threaded = worker_pool in ("gevent", "eventlet")
    pool_class  = (
        psycopg2.pool.ThreadedConnectionPool if is_threaded
        else psycopg2.pool.SimpleConnectionPool
    )
    db_pool = pool_class(minconn=2, maxconn=10, dsn=settings.DATABASE_URL)
    logger.info("worker init done", pool=pool_class.__name__, worker_pool=worker_pool)

def _get_local_breaker() -> "_InProcessBreaker":
    global _local_breaker
    if _local_breaker is None:
        _local_breaker = _InProcessBreaker()
    return _local_breaker

BACKOFF_BASE_SEC = 1
BACKOFF_CAP_SEC  = 60

def jittered_backoff(attempt: int) -> float:
    cap = min(BACKOFF_CAP_SEC, BACKOFF_BASE_SEC * (2 ** attempt))
    return random.uniform(0, cap)

Двухуровневый DLQ

Сначала делал LPUSH и EXPIRE двумя отдельными командами. Между ними возможен crash, ключ остаётся без TTL, живёт вечно. Фикс через pipeline.

Про схему DLQ: таблица использует BIGSERIAL PRIMARY KEY, не event_id PRIMARY KEY. Это позволяет хранить несколько попыток для одного event_id. Следствие: ON CONFLICT (event_id) DO NOTHING недопустим, event_id не имеет UNIQUE constraint. Каждый INSERT создаёт новую запись с полной историей попытки.

DLQ_REDIS_KEY = "dlq:payment_events"
DLQ_REDIS_TTL = 7 * 24 * 3600  # 7 дней

def save_to_dlq_sync(conn, event_id, event_type, user_id, amount, error):
    payload = {
        "event_id": event_id, "event_type": event_type,
        "user_id": user_id, "amount": str(amount),
        "error": error, "trace_id": get_trace_id(),
    }

    db_ok = False
    try:
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO dead_letter_queue "
                "(event_id, event_type, user_id, amount, error, created_at) "
                "VALUES (%s, %s, %s, %s, %s, NOW())",
                (event_id, event_type, user_id, str(amount), error)
            )
            conn.commit()
        db_ok = True
    except Exception as db_exc:
        logger.error("DLQ postgres write failed", event_id=event_id, error=str(db_exc))
        try:
            conn.rollback()
        except Exception:
            pass

    if db_ok:
        return

    try:
        pipe = _redis_client.pipeline()
        pipe.lpush(DLQ_REDIS_KEY, json.dumps(payload))
        pipe.expire(DLQ_REDIS_KEY, DLQ_REDIS_TTL)
        pipe.execute()
        logger.warning("DLQ saved to Redis fallback", event_id=event_id)
        return
    except redis_lib.RedisError as e:
        logger.error("DLQ redis write failed", event_id=event_id, error=str(e))

    logger.critical(
        "DLQ_UNRECOVERABLE",
        event_id=event_id, event_type=event_type,
        user_id=user_id, amount=str(amount),
        error=error, trace_id=get_trace_id(),
        dlq_payload=payload,
    )

drain_redis_dlq запускается по расписанию после инцидента с БД. failed сбрасывается в 0 при каждом успешном INSERT, счётчик consecutive, не total. Чередующиеся успехи и ошибки не триггерят аварийный break, но drained > 0 при этом.

DRAIN_BATCH_SIZE = 500

@shared_task(name="drain_redis_dlq")
def drain_redis_dlq() -> dict:
    drained = 0
    failed  = 0
    conn    = get_validated_conn(db_pool)
    try:
        for _ in range(DRAIN_BATCH_SIZE):
            raw = _redis_client.rpop(DLQ_REDIS_KEY)
            if raw is None:
                break
            try:
                payload = json.loads(raw)
            except json.JSONDecodeError:
                logger.critical(
                    "drain_redis_dlq: malformed JSON in DLQ, item discarded",
                    raw=raw[:200],
                )
                continue
            try:
                with conn.cursor() as cur:
                    cur.execute(
                        "INSERT INTO dead_letter_queue "
                        "(event_id, event_type, user_id, amount, error, created_at) "
                        "VALUES (%s, %s, %s, %s, %s, NOW())",
                        (payload['event_id'], payload['event_type'],
                         payload['user_id'], payload['amount'], payload['error'])
                    )
                    conn.commit()
                drained += 1
                failed = 0
            except Exception as e:
                try:
                    conn.rollback()
                except Exception:
                    pass
                pipe = _redis_client.pipeline()
                pipe.lpush(DLQ_REDIS_KEY, raw)
                pipe.expire(DLQ_REDIS_KEY, DLQ_REDIS_TTL)
                pipe.execute()
                failed += 1
                logger.error(
                    "drain failed, requeued to head",
                    event_id=payload.get('event_id'), error=str(e),
                )
                if failed >= 10:
                    logger.error("drain aborted after 10 consecutive failures")
                    break
    finally:
        conn.putconn()

    logger.info("drain_redis_dlq done", drained=drained, failed=failed)
    return {"drained": drained, "failed": failed}

Celery task: полная версия

@shared_task(name="process_payment_event", bind=True, max_retries=5, acks_late=True, reject_on_worker_lost=True)
def process_payment_event(self, event_id, event_type, user_id, amount, trace_id=""):
    set_trace_id(trace_id or new_trace_id())

    conn      = get_validated_conn(db_pool)
    committed = False
    conn_ok   = True

    try:
        if event_type in ("deposit", "airdrop"):
            process_deposit_sync(conn, event_id, event_type, user_id, amount)
            committed = True
        elif event_type in ("withdrawal", "withdrawal_fee"):
            outcome = process_withdrawal_sync(conn, event_id, event_type, user_id, amount)
            committed = True
            if outcome == "insufficient_funds":
                notify_user_insufficient_funds(user_id)
        else:
            logger.error("unknown event_type", event_type=event_type, event_id=event_id)
            try:
                conn.rollback()
            except Exception:
                pass
            try:
                _mark_event_failed(conn, event_id)
            except Exception as mark_exc:
                logger.error("_mark_event_failed raised",
                            event_id=event_id, error=str(mark_exc))
            save_to_dlq_sync(
                conn, event_id, event_type, user_id, amount,
                f"unknown event_type: {event_type!r}"
            )
            raise Ignore()
    except AlreadyProcessedError as exc:
        logger.info("event already processed", event_id=event_id, reason=str(exc))
        raise Ignore()
    except RetryableError as exc:
        delay = jittered_backoff(self.request.retries)
        logger.warning(
            "retrying", event_id=event_id,
            attempt=self.request.retries, delay=delay, reason=str(exc),
        )
        try:
            raise self.retry(exc=exc, countdown=delay)
        except MaxRetriesExceededError:
            conn.rollback()
            _mark_event_failed(conn, event_id)
            save_to_dlq_sync(conn, event_id, event_type, user_id, amount,
                             f"retries exhausted: {exc}")
            raise Ignore()
    except Ignore:
        raise
    except Exception as exc:
        logger.exception("unhandled error", event_id=event_id)
        try:
            conn.rollback()
        except Exception:
            pass
        try:
            _mark_event_failed(conn, event_id)
        except Exception as mark_exc:
            logger.error("_mark_event_failed raised in catch-all",
                        event_id=event_id, error=str(mark_exc))
        try:
            save_to_dlq_sync(conn, event_id, event_type, user_id, amount, str(exc))
        except Exception as dlq_exc:
            logger.critical(
                "DLQ write failed, manual recovery required",
                event_id=event_id, trace_id=get_trace_id(),
                original_error=str(exc), dlq_error=str(dlq_exc),
            )
        self.update_state(state="FAILURE", meta={"error": str(exc)})
        raise Ignore()
    finally:
        if not committed:
            try:
                conn.rollback()
            except Exception as rb_exc:
                logger.error("rollback failed", event_id=event_id, error=str(rb_exc))
                conn_ok = False
        conn.putconn(close=not conn_ok)

def notify_user_insufficient_funds(user_id: int) -> None:
    pass

notify_user_insufficient_funds: заглушка. В проде нужна outbox-запись внутри process_withdrawal_sync до финального commit. Вызов отсюда (после commit) означает out-of-band: отдельная транзакция, отдельные гарантии доставки.

Здесь скрыта ловушка специфичная именно для at-least-once. Провайдер присылает одно событие трижды process_payment_event отработает трижды. Баланс не изменится (idempotency через unique constraint). Но notify вызовется трижды, пользователь получит три уведомления «недостаточно средств» вместо одного. Идемпотентность БД не распространяется на side-effects вне транзакции автоматически.

Вторая проблема с этим placement: если notify бросит исключение, он попадёт в catch-all except Exception, который запишет событие в DLQ, хотя деньги уже списаны корректно и business-транзакция закоммичена. Это шум в DLQ который будет скрывать реальные инциденты.

Circuit breaker для Web3 RPC

Трёхфазный: closed → open → half-open → closed. Redis как shared state между инстансами, in-process breaker как fallback когда Redis недоступен.

_InProcessBreaker: простой per-process счётчик с lock. Открывается приRPC_ERROR_THRESHOLD ошибках, закрывается автоматически после RPC_COOLDOWN_SEC секунд. Нужен именно как fallback: если Redis сам недоступен, circuit breaker не должен перестать работать.

Нужно запустить только один тестовый запрос пока breaker открыт, и тогда SET nx=True гарантирует, что только первый воркер получит право, а остальные увидят занятый probe key.

import web3
from prometheus_client import Counter

rpc_errors_total = Counter("web3_rpc_errors_total", "Web3 RPC failures", ["method"])

_w3 = web3.Web3(web3.HTTPProvider(settings.ETH_RPC_URL))

FINALIZED_CACHE_TTL   = 86_400
PENDING_CACHE_TTL     = 30
CACHE_PREFIX          = "eth:fin:"
RPC_ERROR_THRESHOLD   = 5
RPC_COOLDOWN_SEC      = 60
RPC_ERROR_WINDOW_SEC  = 30
HALF_OPEN_PROBE_KEY   = "circuit:web3:half_open_probe"
HALF_OPEN_PROBE_TTL   = 10

@dataclass
class _InProcessBreaker:
    _lock:       threading.Lock = field(default_factory=threading.Lock)
    _errors:     int            = 0
    _open_until: "datetime | None" = None

    def is_open(self) -> bool:
        with self._lock:
            if self._open_until is None:
                return False
            if datetime.now(timezone.utc) > self._open_until:
                self._open_until = None
                self._errors = 0
                return False
            return True

    def record_error(self) -> None:
        with self._lock:
            self._errors += 1
            if self._errors >= RPC_ERROR_THRESHOLD:
                self._open_until = datetime.now(timezone.utc) + timedelta(seconds=RPC_COOLDOWN_SEC)

    def record_success(self) -> None:
        with self._lock:
            self._errors     = 0
            self._open_until = None

def _is_circuit_open() -> bool:
    try:
        if _redis_client.get("circuit:web3:open"):
            is_probe = _redis_client.set(
                HALF_OPEN_PROBE_KEY, "1", nx=True, ex=HALF_OPEN_PROBE_TTL
            )
            if is_probe:
                return False
            return True
    except redis_lib.RedisError:
        pass
    return _get_local_breaker().is_open()

def _record_rpc_error(method: str) -> None:
    rpc_errors_total.labels(method=method).inc()
    _get_local_breaker().record_error()
    try:
        pipe  = _redis_client.pipeline()
        pipe.incr("circuit:web3:errors")
        pipe.expire("circuit:web3:errors", RPC_ERROR_WINDOW_SEC)
        count, _ = pipe.execute()
        if count >= RPC_ERROR_THRESHOLD:
            open_pipe = _redis_client.pipeline()
            open_pipe.setex("circuit:web3:open", RPC_COOLDOWN_SEC, "1")
            open_pipe.set(HALF_OPEN_PROBE_KEY, "1", ex=HALF_OPEN_PROBE_TTL)
            open_pipe.execute()
            logger.critical("web3 circuit breaker opened", count=count)
            send_alert(
                f"[CRITICAL] Web3 RPC circuit breaker OPEN, "
                f"{count} failures in {RPC_ERROR_WINDOW_SEC}s.",
                alert_key="web3_breaker_open",
            )
        else:
            if not _redis_client.exists("circuit:web3:open"):
                _redis_client.delete(HALF_OPEN_PROBE_KEY)
    except redis_lib.RedisError as e:
        logger.warning("circuit breaker state write failed", error=str(e))

def _record_rpc_success() -> None:
    _get_local_breaker().record_success()
    try:
        _redis_client.delete("circuit:web3:errors")
        _redis_client.delete(HALF_OPEN_PROBE_KEY)
        if _redis_client.delete("circuit:web3:open"):
            logger.info("web3 circuit breaker closed")
            send_alert(
                "[INFO] Web3 RPC circuit breaker CLOSED, recovered",
                alert_key="web3_breaker_closed",
            )
    except redis_lib.RedisError:
        pass

def is_transaction_finalized(tx_hash: str) -> bool:
    if _is_circuit_open():
        logger.warning("web3 circuit open, skipping", tx_hash=tx_hash)
        return False

    cache_key = f"{CACHE_PREFIX}{tx_hash}"
    try:
        cached = _redis_client.get(cache_key)
        if cached == "1":
            return True
        if cached == "0":
            return False
    except redis_lib.RedisError as e:
        logger.warning("redis cache unavailable", tx_hash=tx_hash, error=str(e))

    method = "get_transaction_receipt"
    try:
        receipt = _w3.eth.get_transaction_receipt(tx_hash)
        if receipt is None:
            return False
        method = "get_block_finalized"
        finalized_block = _w3.eth.get_block("finalized")["number"]
        result = receipt["blockNumber"] <= finalized_block
        _record_rpc_success()
    except Exception as e:
        logger.error("eth rpc error", tx_hash=tx_hash, error=str(e))
        _record_rpc_error(method)
        return False

    try:
        ttl = FINALIZED_CACHE_TTL if result else PENDING_CACHE_TTL
        _redis_client.setex(cache_key, ttl, "1" if result else "0")
    except redis_lib.RedisError as e:
        logger.warning("redis cache write failed", tx_hash=tx_hash, error=str(e))

    return result

Мониторинг

hot_path_balance_check: не полноценная финансовая сверка, а только мониторинг горячего пути за последние 10 минут. users.balance и SUM(balance_events) читаются одним JOIN-запросом, снимок данных берётся атомарно. REPEATABLE_READ здесь defensive engineering: гарантирует консистентный снимок на уровне транзакции и защищает от phantom reads если транзакция вырастет до нескольких операторов в будущем. Для полной исторической сверки нужна ночная задача, это в бэклоге.

set_isolation_level может бросить если соединение разорвано. Без try/except вокруг него соединение вернётся в пул с REPEATABLE_READ, следующий пользователь не будет ожидать такого поведения.

from prometheus_client import Counter
hot_path_runs = Counter("hot_path_balance_check_runs_total", "Runs of hot path balance check")

@shared_task(name="hot_path_balance_check")
def hot_path_balance_check() -> None:
    conn    = get_validated_conn(db_pool)
    conn_ok = True
    try:
        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ
        )
        try:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute("SET LOCAL statement_timeout = '10s'")
                cur.execute("""
                    SELECT
                        u.id,
                        u.balance                                        AS actual_balance,
                        u.initial_balance + COALESCE(SUM(be.amount), 0) AS calculated_balance
                    FROM users u
                    INNER JOIN (
                        SELECT DISTINCT user_id FROM balance_events
                        WHERE created_at > NOW() - INTERVAL '10 minutes'
                    ) recent ON recent.user_id = u.id
                    LEFT JOIN balance_events be ON be.user_id = u.id
                    GROUP BY u.id, u.balance, u.initial_balance
                    HAVING u.balance != u.initial_balance + COALESCE(SUM(be.amount), 0)
                """)
                mismatches = cur.fetchall()
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            try:
                conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_DEFAULT)
            except Exception:
                conn_ok = False

        hot_path_runs.inc()

        if mismatches:
            send_alert(
                f"[CRITICAL] balance mismatch: {[dict(m) for m in mismatches]}",
                alert_key="balance_mismatch",
            )

    except Exception:
        logger.exception("hot_path_balance_check failed")
        try:
            conn.rollback()
        except Exception:
            pass
        send_alert(
            "[WARNING] hot_path_balance_check failed, check worker logs",
            alert_key="balance_check_failed",
        )
        raise
    finally:
        conn.putconn(close=not conn_ok)

@shared_task(name="alert_zombie_events")
def alert_zombie_events() -> None:
    conn = get_validated_conn(db_pool)
    try:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute("""
                SELECT event_id, user_id, status, updated_at FROM payment_events
                WHERE (status = 'processing' AND updated_at < NOW() - INTERVAL '5 minutes')
                   OR (status IN ('pending', 'enqueued') AND updated_at < NOW() - INTERVAL '15 minutes')
            """)
            zombies = cur.fetchall()

            cur.execute("""
                SELECT COUNT(*) AS dlq_size FROM dead_letter_queue
                WHERE created_at > NOW() - INTERVAL '1 hour'
            """)
            recent_dlq = cur.fetchone()['dlq_size']

            cur.execute("""
                SELECT COUNT(*) AS stuck FROM processed_events
                WHERE outcome = 'pending' AND created_at < NOW() - INTERVAL '10 minutes'
            """)
            stuck_pending = cur.fetchone()['stuck']

        if zombies:
            send_alert(
                f"[WARNING] zombie events: {[z['event_id'] for z in zombies[:20]]}",
                alert_key="zombie_events",
            )
        if recent_dlq > 10:
            send_alert(
                f"[WARNING] {recent_dlq} events in DLQ last hour, investigate",
                alert_key="dlq_flood",
            )
        if stuck_pending:
            send_alert(
                f"[CRITICAL] {stuck_pending} stuck 'pending' rows in processed_events, "
                f"architectural invariant broken, investigate urgently",
                alert_key="processed_events_stuck_pending",
            )
    finally:
        try:
            conn.rollback()
        except Exception:
            pass
        conn.putconn()

Prometheus-алерты:

- alert: HotPathBalanceCheckNotRunning
  expr: increase(hot_path_balance_check_runs_total[6m]) == 0
  for: 0m
- alert: Web3RPCCircuitOpen
  expr: increase(web3_rpc_errors_total[5m]) > 5
  for: 0m
- alert: CeleryHighRetryRate
  expr: rate(celery_tasks_total{state="retry"}[5m])
      / rate(celery_tasks_total{state="success"}[5m]) > 0.1
- alert: CeleryQueueDepth
  expr: celery_queue_depth > 500
  for: 5m

Beat расписание:

from celery.schedules import crontab

beat_schedule = {
    "enqueue-pending-events":        {"task": "enqueue_pending_events",        "schedule": 5.0},
    "recover-stale-enqueued-events": {"task": "recover_stale_enqueued_events", "schedule": 120.0},
    "cleanup-processed-events":      {"task": "cleanup_processed_events",      "schedule": crontab(hour=3, minute=0)},
    "hot-path-balance-check":        {"task": "hot_path_balance_check",        "schedule": 60.0},
    "alert-zombie-events":           {"task": "alert_zombie_events",           "schedule": 60.0},
    "drain-redis-dlq":               {"task": "drain_redis_dlq",               "schedule": crontab(hour=4, minute=0)},
}

Cleanup

processed_events чистит только terminal outcomes. outcome='pending' не трогается никогда. Это защита от тихого удаления событий в середине обработки. Соединение берётся и возвращается в пул на каждый батч, а не удерживается на всё время задачи.

TTL 14 дней безопасен благодаря UNIQUE (source_event_id, event_type) на balance_events: даже если провайдер реплеит событие спустя 14+ дней и processed_events уже очищен, повторный INSERT в balance_events отклоняется unique constraint. Даже если приложение облажается, unique constraint на balance_events не пропустит дубль. База будет последним рубежом.

CLEANUP_BATCH_SIZE    = 5_000
CLEANUP_BATCH_PAUSE   = 0.1
CLEANUP_MAX_BATCHES   = 200
CLEANUP_TTL_DAYS      = 14
CLEANUP_SAFE_STATUSES = ("success", "insufficient_funds")

@shared_task(name="cleanup_processed_events")
def cleanup_processed_events() -> dict:
    import time
    total_deleted = 0
    batches = 0
    for _ in range(CLEANUP_MAX_BATCHES):
        conn = get_validated_conn(db_pool)
        try:
            with conn.cursor() as cur:
                cur.execute("""
                    DELETE FROM processed_events
                    WHERE idempotency_key IN (
                        SELECT idempotency_key FROM processed_events
                        WHERE outcome = ANY(%s)
                          AND created_at < NOW() - (%s * INTERVAL '1 day')
                        LIMIT %s
                        FOR UPDATE SKIP LOCKED
                    )
                """, (list(CLEANUP_SAFE_STATUSES), CLEANUP_TTL_DAYS, CLEANUP_BATCH_SIZE))
                deleted = cur.rowcount
                conn.commit()
        except Exception:
            try:
                conn.rollback()
            except Exception:
                pass
            raise
        finally:
            conn.putconn()
        total_deleted += deleted
        batches += 1
        if deleted < CLEANUP_BATCH_SIZE:
            break
        time.sleep(CLEANUP_BATCH_PAUSE)
    return {"batches": batches, "deleted": total_deleted}

Тесты и что они не покрывают

Idempotency на моках не проверишь. Unique constraint должен работать как в проде, значит нужна реальная БД. threading не подходит, GIL мешает нормально воспроизвести гонку. Поэтому multiprocessing.

Баланс может сойтись, а balance_events при этом задублирован. Проверяй оба. Именно так и бывает, баланс сходится, всё выглядит чисто, а дубли в balance_events всплывают только когда приходит аудитор.

import pytest
import psycopg2
import psycopg2.extras
import multiprocessing
from decimal import Decimal

TEST_DSN = "host=localhost port=5433 dbname=testdb user=testuser password=testuser"

@pytest.fixture(scope="session")
def db_conn_session():
    conn = psycopg2.connect(TEST_DSN)
    yield conn
    conn.close()

@pytest.fixture
def db_conn(db_conn_session):
    conn = db_conn_session
    conn.rollback()
    with conn.cursor() as cur:
        cur.execute("""
            TRUNCATE balance_events, processed_events, payment_events,
                     dead_letter_queue, users
            RESTART IDENTITY CASCADE
        """)
        cur.execute(
            "INSERT INTO users (id, balance, initial_balance) VALUES (1, 100.0, 100.0)"
        )
    conn.commit()
    yield conn
    try:
        conn.rollback()
    except Exception:
        pass

def _deposit_worker(dsn, event_id, event_type, user_id, amount, barrier, q):
    conn = psycopg2.connect(dsn)
    try:
        barrier.wait(timeout=10)
        process_deposit_sync(conn, event_id, event_type, user_id, amount)
        q.put(("ok", None))
    except Exception as e:
        q.put(("err", f"{type(e).__name__}: {e}"))
    finally:
        conn.close()

def test_duplicate_deposits_produce_single_credit(db_conn):
    """10 воркеров, ОДИН event_id, имитация at-least-once delivery."""
    with db_conn.cursor() as cur:
        cur.execute(
            "INSERT INTO payment_events (event_id, user_id, amount, event_type, status) "
            "VALUES ('evt_dup', 1, '50.0', 'deposit', 'enqueued')"
        )
    db_conn.commit()

    N = 10
    barrier = multiprocessing.Barrier(N)
    q = multiprocessing.Queue()
    workers = [
        multiprocessing.Process(
            target=_deposit_worker,
            args=(TEST_DSN, "evt_dup", "deposit", 1, "50.0", barrier, q),
        ) for _ in range(N)
    ]
    for w in workers: w.start()
    for w in workers: w.join(timeout=20)

    with db_conn.cursor() as cur:
        cur.execute("SELECT balance FROM users WHERE id = 1")
        balance = cur.fetchone()[0]
        cur.execute("SELECT COUNT(*) FROM balance_events")
        be_count = cur.fetchone()[0]
        cur.execute("SELECT COUNT(*) FROM processed_events")
        pe_count = cur.fetchone()[0]

    assert balance == Decimal("150.0"), f"дублирование! balance={balance}"
    assert be_count == 1, f"balance_events дублирован: {be_count}"
    assert pe_count == 1

Что тесты не покрывают

Throughput под реальной нагрузкой. NOWAIT сериализует доступ, throughput достигается через Celery retry с jittered backoff. Для нагрузочного тестирования нужен отдельный стенд с Celery workers.

Падение воркера посреди транзакции. reject_on_worker_lost=True требует убийства Celery worker процесса и проверки что задача возвращена в брокер. Это integration test, живёт отдельно.

Redis DLQ fallback. Требует реального Redis и имитации падения PostgreSQL. Проверяется через chaos testing.

Backpressure и деградация

Под высокой конкуренцией на одного пользователя RetryableError от NOWAIT накапливаются. max_retries=5 даёт 6 попыток (initial + 5 retry), суммарный worst case delay до ~63с. Если throughput входящих webhooks превышает способность разгребать retry, CeleryQueueDepth начнёт расти.

Когда DLQ начинает заполняться: alert_zombie_events зафиксирует dlq_size > 10 events/hour, первый сигнал. PostgreSQL dead_letter_queue растёт без ограничений, теоретически до размера диска. Дальше ручная работа: разобрать причину, починить, сделать replay из DLQ.

Автореплей не сделан сознательно. Событие попало в DLQ, значит что-то пошло не так. Пусть человек разберётся прежде, чем гнать его обратно. Не хочу, чтобы система сама решала что делать с деньгами, которые уже один раз упали.

Если Redis broker недоступен, apply_async всегда падает. Событие остаётся в enqueued, через 3 минуты recover_stale_enqueued_events переводит обратно в pending и инкрементит retry_count. После MAX_RECOVERY_ATTEMPTS (10 попыток ~ 30 минут) событие уходит в DLQ + алерт.

acks_late и reject_on_worker_lost защищают от падения воркера, не брокера. Если master Redis упадёт, in-flight задачи потеряны. appendonly yes + appendfsync everysec, минимум который должен быть. Если совсем не хочешь терять данные, appendfsync always, но throughput просядет.

Про blockchain reorg

Для Ethereum после перехода на PoS финализация через два checkpoint эпохи (~12-15 минут). Эвристика «12 блоков» из эпохи PoW сейчас не применима. Для L2 (Arbitrum, Optimism) правила другие, здесь только про Ethereum L1.

Текущая реализация reorg не обрабатывает, это скоуп первого релиза. При реорге блокчейн «откатывает» несколько блоков. Транзакция может попасть в новый блок без изменений или не попасть никуда, эффективно отменена.

Самый простой вариант: компенсирующая запись в balance_events со знаком минус, с отдельным idempotency key чтобы не конфликтовать с оригинальным событием:

def handle_reorg_event(original_tx_hash: str, user_id: int, amount: Decimal) -> None:
    reorg_event_id  = f"reorg:{original_tx_hash}"
    idempotency_key = _idempotency_key(reorg_event_id, "reorg_compensation")
    # далее стандартный flow через processed_events + balance_events

Что в бэклоге

Самое болезненное прямо сейчас: notify_user_insufficient_funds вызывается вне транзакции, нужен outbox pattern, запись в outbox-таблицу внутри той же транзакции что и UPDATE users. Без этого при at-least-once delivery пользователь получает N уведомлений на один отказ, а при сбое notify в DLQ попадают записи об успешно завершённых транзакциях. При >50 воркеров прямые соединения упираются в max_connections, нужен PgBouncer. Мина которую стоит учесть до его включения в transaction pooling: hot_path_balance_check использует conn.set_isolation_level(REPEATABLE_READ), это session-level команда, PgBouncer в transaction pooling её не сохраняет между транзакциями. SET LOCAL lock_timeout/statement_timeout работают нормально (они tx-scoped), а isolation level придётся переписать на BEGIN ISOLATION LEVEL REPEATABLE READ прямо в запросе или оставить session pooling для этой конкретной таски. Это классическая сеньорская мина: в dev всё работает, ломается только после включения PgBouncer в проде. processed_events растёт, cleanup закрывает проблему на ближайшее время, но партиционирование по дате при переходе за сотни миллионов строк неизбежно.

Дальше: Redis broker с appendfsync always, полная историческая сверка через materialized view, полная реализация reorg handling, Admin UI для DLQ вместо ручного SQL. Одна ловушка в коде которую я пока не трогал: _mark_event_failed коммитит сама, при будущем рефакторинге это тебя укусит.

Результат

Очередь разгребается и суммы сходятся, это разные вещи. Prometheus показывает первое, hot_path_balance_check второе. Нужны оба, одним не обойтись.

8 месяцев в проде. 0 дублирующих зачислений после деплоя фикса, против 23 за первый месяц на ~180k транзакций. Webhook-доставка 100% с учётом retry-логики провайдера.

Здесь нет ни одного решения которое я придумал заранее. Каждое закрывает дыру которая уже стрельнула. Idempotency через DB unique constraint, TOCTOU через SELECT FOR UPDATE NOWAIT, потеря транзакций через acks_late + outbox, FSM через VALID_TRANSITIONS. Каждое из этих решений по отдельности не гарантия. Вместе закрывают друг друга. Восемь месяцев без инцидентов.

Комментарии (7)


  1. alexander-java
    27.04.2026 17:17

    Блин. Есть всякие java с соответствующей обвязкой фреймворками \ библиотеками. Ещё и куча требований есть к начинающим программистам: чистый код, всякие архитектурные шаблоны. И то бизнес нос воротит.

    А вот пример, за что бизнес платит деньги.


  1. nikulin_krd
    27.04.2026 17:17

    • Сколько ошибок в проектировании финтех проекта хотите?

    • Да!

    Прям полный список антипаттернов...

    1. Откройте для себя Check Constraint в PostgreSQL, чтобы не делать глупых селектов баланса

    2. Откройте для себя Saga и RabbitMQ хотя бы, чтобы не городить велосипеды на PostgreSQL

    3. Перестаньте хранить денежные единицы во float

    4. Откройте для себя представления в PostgreSQL и модуль крона в нем

    5. Откройте для себя уровни изоляции транзакций и понимание их сайд-эффектов

    6. Перестаньте писать спагетти-код


    1. wicsion Автор
      27.04.2026 17:17

      разберём по пунктам:

      1. CHECK constraint есть, balance_non_negative CHECK (balance >= 0) прямо в схеме в статье.

      2. Осознанный выбор в пользу PostgreSQL как единственного источника правды. Unique constraint даёт атомарность которую Saga с RabbitMQ без persistence не даст.

      3. именно про это написан отдельный раздел. NUMERIC(38,18) везде, _validate_amount явно отклоняет float с сообщением "float не допускается".

      4. Представления и pg_cron, вкусовщина, не антипаттерн.

      5. Уровни изоляции, REPEATABLE READ в balance check, NOWAIT на всех блокировках, lock_timeout, явный catch на DeadlockDetected.

      6. Спагетти-код, не понял к чему это.

      Такое ощущение, что вы комментировали не читая статью.


      1. nikulin_krd
        27.04.2026 17:17

        1. Мой косяк. Посмотрел на первый запрос.

        2. "Unique constraint даёт атомарность которую Saga с RabbitMQ без persistence не даст." с чего вдруг? Создаете ключ идемпотентности и вперед. Ключ есть в базе - обрабатываете сообщение, ключа нет - nack. Мало того, при большом объеме транзакций, у вас база просто захлебнется от того количества запросов, которые к ней идут. Это велосипед, причем крайне сомнительный

        3. Зачем вам 18 знаков после запятой, если вы храните в неделимых единицах криптовалюты?

        4. Представления и pg_cron позволяют нормально читать без портяночных запросов в коде

        5. Претензия обоснована. Не увидел.

        6. Как-нибудь поймете


        1. wicsion Автор
          27.04.2026 17:17

          2. Unique constraint в PostgreSQL и есть idempotency key. RabbitMQ без persistence, дополнительная точка отказа. Нагрузка, осознанный трейдофф, PgBouncer и партиционирование в бэклоге.

          3. Храним в ETH, провайдер присылает сконвертированное. NUMERIC(38,18), worst case для любого ERC-20 с 18 decimals.

          5. REPEATABLE READ, NOWAIT, lock_timeout, statement_timeout, catch на LockNotAvailable, QueryCanceled, DeadlockDetected, всё в коде.


          1. nikulin_krd
            27.04.2026 17:17

            Я же написал, что ваша претензия обоснована по 5-ому пункту


  1. redfox0
    27.04.2026 17:17

    Так и предаствляю владельца бизнеса: всё было хорошо, пришёл какой-то программист, стало плохо, переписал всё, теперь стало ещё хуже и никто не понимает написанный код. true story