Привет, Хабр!
Сегодня мы рассмотрим реализацию Outbox паттерна в разных ЯП. Цель простая: одним атомарным действием обновлять бизнес-данные и фиксировать факт события, а затем надежно доставлять его в брокер. Разберем общую схему, конкуренцию воркеров, ретраи, дедупликацию, метрики и покажу, как это собрать на C#, Java, Go, Python и Node.js.
Outbox-паттерн придумали, чтобы решить простую, но очень неприятную проблему: сервис обновил данные в своей базе, а сообщение во внешний мир отправить не получилось. Например, пользователь зарегистрировался, в таблице он появился, а сообщение «UserCreated» в Kafka не улетело. И всё, в соседних сервисах этого события как будто не было. Получается рассинхрон, который потом не разгребёшь.
Смысл паттерна в том, что мы не пытаемся одним махом сохранить и в базу, и в брокер. Вместо этого всё делаем в два шага. В транзакции вместе с бизнес-данными сохраняем строку в таблицу outbox: тип события, payload, ключ. Это атомарная операция, или пишется всё, или ничего. А дальше уже отдельный воркер вычитывает эти строки и публикует их в Kafka, RabbitMQ или куда угодно. Если брокер временно недоступен, строка просто остаётся в outbox и будет доставлена при следующей попытке.
В итоге мы никогда не потеряем событие, если база его зафиксировала. Да, событие может прийти дважды, поэтому на стороне потребителей нужен идемпотентный код (чтобы повтор не сломал логику). Зато вся схема прозрачная: есть таблица-очередь, есть фоновые диспатчеры, есть метрики по зависшим событиям. Для маленьких сервисов это можно написать вручную, для крупных используют готовые решения вроде Debezium Outbox SMT или встроенный outbox в MassTransit и NServiceBus.
Общая схема данных и инварианты
Исходим из Postgres. Минимально рабочая модель:
-- outbox_events: единица отправки, одна строка = одно событие.
CREATE TABLE outbox_events (
id uuid PRIMARY KEY,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
headers jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
available_at timestamptz NOT NULL DEFAULT now(),
attempts int NOT NULL DEFAULT 0,
status text NOT NULL DEFAULT 'PENDING', -- PENDING | SENT | FAILED
dedup_key text, -- для идемпотентности на стороне потребителя или брокера
partition_key text, -- для фиксации порядка по агрегату
seq bigint -- монотонный номер по aggregate_id (опционально)
);
CREATE INDEX ON outbox_events (status, available_at);
CREATE INDEX ON outbox_events (partition_key);
CREATE UNIQUE INDEX IF NOT EXISTS outbox_dedup_uniq ON outbox_events(dedup_key) WHERE dedup_key IS NOT NULL;
-- Очередь чтения конкурентными воркерами: SKIP LOCKED позволяет безопасно шардировать нагрузку.
-- Важно ограничивать пачку.
Инварианты:
Пишем бизнес-строку и outbox-строку в ОДНОЙ транзакции.
Доставщик забирает пачку PENDING с
FOR UPDATE SKIP LOCKED
, чтобы воркеры не дрались.Ретраим с backoff, считаем attempts и логируем ошибку целиком.
На потребителе либо идемпотентное применение, либо дедуп по
dedup_key
.Порядок по агрегату обеспечиваем
partition_key
иseq
, и партиционируем события по ключу в брокере.
Алгоритм диспатчера
Псевдокод для понимания, он одинаковый для всех реализаций ниже:
loop:
begin;
rows = select * from outbox_events
where status = 'PENDING' and available_at <= now()
order by created_at
for update skip locked
limit N;
if rows empty: commit; sleep(small); continue;
for row in rows:
try send_to_broker(row)
mark SENT
catch e:
attempts++
if attempts < max_attempts:
available_at = now() + backoff(attempts)
keep PENDING
else:
status = 'FAILED'
commit;
Если Kafka уже включена в контур, можно вместо poller использовать Debezium Outbox SMT на коннекторе, который вычитывает изменения из outbox таблицы и публикует в топики без вашего кода.
C# + EF Core
Берем EF Core с перехватчиком SaveChanges
для автодобавления outbox-события и BackgroundService
для диспатча.
// Domain event
public sealed record UserCreated(Guid UserId, string Email);
// EF entity
public class User {
public Guid Id { get; set; }
public string Email { get; set; } = null!;
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
}
// Outbox EF entity
public class OutboxEvent {
public Guid Id { get; set; }
public string AggregateType { get; set; } = null!;
public string AggregateId { get; set; } = null!;
public string EventType { get; set; } = null!;
public string Payload { get; set; } = null!;
public string Headers { get; set; } = "{}";
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
public DateTimeOffset AvailableAt { get; set; } = DateTimeOffset.UtcNow;
public int Attempts { get; set; }
public string Status { get; set; } = "PENDING";
public string? DedupKey { get; set; }
public string? PartitionKey { get; set; }
public long? Seq { get; set; }
}
public sealed class AppDbContext(DbContextOptions<AppDbContext> options) : DbContext(options) {
public DbSet<User> Users => Set<User>();
public DbSet<OutboxEvent> Outbox => Set<OutboxEvent>();
}
Перехватчик: складываем outbox-строку в текущую транзакцию.
public sealed class OutboxSaveChangesInterceptor : SaveChangesInterceptor {
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(
DbContextEventData eventData,
InterceptionResult<int> result,
CancellationToken cancellationToken = default)
{
var ctx = (AppDbContext)eventData.Context!;
var entries = ctx.ChangeTracker.Entries<User>()
.Where(e => e.State == EntityState.Added)
.ToList();
foreach (var e in entries) {
var ev = new UserCreated(e.Entity.Id, e.Entity.Email);
var outbox = new OutboxEvent {
Id = Guid.NewGuid(),
AggregateType = "User",
AggregateId = e.Entity.Id.ToString(),
EventType = nameof(UserCreated),
Payload = JsonSerializer.Serialize(ev),
DedupKey = $"user-created:{e.Entity.Id}",
PartitionKey = e.Entity.Id.ToString()
};
ctx.Outbox.Add(outbox);
}
return base.SavingChangesAsync(eventData, result, cancellationToken);
}
}
Диспатчер: конкурентная выборка пачки через сырой SQL с FOR UPDATE SKIP LOCKED
. EF поддерживает ExecuteSql
и маппинг в entity, но для адекватного контроля берем Npgsql
напрямую.
public sealed class OutboxDispatcher : BackgroundService {
private readonly IDbContextFactory<AppDbContext> _factory;
private readonly IKafkaProducer _producer; // ваш интерфейс
private readonly ILogger<OutboxDispatcher> _log;
public OutboxDispatcher(IDbContextFactory<AppDbContext> factory, IKafkaProducer producer, ILogger<OutboxDispatcher> log) {
_factory = factory; _producer = producer; _log = log;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
while (!stoppingToken.IsCancellationRequested) {
try {
using var ctx = await _factory.CreateDbContextAsync(stoppingToken);
await using var tx = await ctx.Database.BeginTransactionAsync(stoppingToken);
// Берем пачку
var rows = await ctx.Outbox
.FromSqlRaw("""
select * from outbox_events
where status = 'PENDING' and available_at <= now()
order by created_at
for update skip locked
limit 100
""").ToListAsync(stoppingToken);
if (rows.Count == 0) {
await tx.CommitAsync(stoppingToken);
await Task.Delay(TimeSpan.FromMilliseconds(200), stoppingToken);
continue;
}
foreach (var row in rows) {
try {
await _producer.SendAsync(topic: "user-events",
key: row.PartitionKey ?? row.AggregateId,
value: row.Payload,
headers: row.Headers);
row.Status = "SENT";
}
catch (Exception ex) {
_log.LogError(ex, "outbox send failed {Id}", row.Id);
row.Attempts += 1;
if (row.Attempts < 10) {
row.AvailableAt = DateTimeOffset.UtcNow + Backoff(row.Attempts);
} else {
row.Status = "FAILED";
}
}
}
await ctx.SaveChangesAsync(stoppingToken);
await tx.CommitAsync(stoppingToken);
}
catch (Exception e) {
_log.LogError(e, "dispatcher loop error");
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
}
}
}
private static TimeSpan Backoff(int attempts) {
var delay = Math.Min(TimeSpan.FromMinutes(5).TotalMilliseconds, Math.Pow(2, attempts) * 100);
return TimeSpan.FromMilliseconds(delay);
}
}
Если вы уже используете NServiceBus или MassTransit, у них есть встроенный Outbox. Это экономит сотни строк кода и прошито в пайплайн.
Java 21 + Spring Boot + JPA: poller и Debezium как альтернатива
С JPA картина аналогичная: пишем сущность outbox и сохраняем в той же транзакции, где меняем доменную модель. Для диспатча используем JdbcTemplate
с явным SQL.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
private UUID id;
private String aggregateType;
private String aggregateId;
private String eventType;
@Column(columnDefinition = "jsonb")
private String payload;
@Column(columnDefinition = "jsonb")
private String headers;
private OffsetDateTime createdAt;
private OffsetDateTime availableAt;
private Integer attempts;
private String status;
private String dedupKey;
private String partitionKey;
private Long seq;
}
Сохранение в одной транзакции:
@Service
public class UserService {
private final UserRepository users;
private final OutboxRepository outbox;
@Transactional
public UUID createUser(String email) {
var user = users.save(new User(email));
var ev = new OutboxEvent(/* set fields, including dedupKey and partitionKey=user.getId().toString() */);
outbox.save(ev);
return user.getId();
}
}
Диспатч пачками:
@Component
public class OutboxDispatcher {
private final JdbcTemplate jdbc;
private final KafkaTemplate<String, String> kafka;
@Scheduled(fixedDelay = 200)
public void tick() {
TransactionTemplate tt = new TransactionTemplate(new DataSourceTransactionManager(jdbc.getDataSource()));
tt.executeWithoutResult(tx -> {
List<Map<String,Object>> rows = jdbc.queryForList("""
select * from outbox_events
where status = 'PENDING' and available_at <= now()
order by created_at
for update skip locked
limit 100
""");
for (var row : rows) {
try {
kafka.send("user-events",
(String)row.get("partition_key"),
(String)row.get("payload")).get();
jdbc.update("update outbox_events set status='SENT' where id = ?",
row.get("id"));
} catch (Exception ex) {
jdbc.update("""
update outbox_events
set attempts = attempts + 1,
available_at = CASE WHEN attempts+1 < 10
THEN now() + make_interval(secs => round(power(2, attempts+1) * 0.1))
ELSE available_at END,
status = CASE WHEN attempts+1 >= 10 THEN 'FAILED' ELSE status END
where id = ?
""", row.get("id"));
}
}
});
}
}
CDC-вариант: Debezium Outbox SMT на коннекторе Postgres. В приложении только пишемстроки в outbox, коннектор сам сформирует события в Kafka, включая ключ по aggregate_id
для порядка и маршрутизацию по event_type
. Фрагмент конфигурации SMT:
{
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "type:header:eventType,aggregate_type:header:aggregateType",
"transforms.outbox.route.by.field": "event_type",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.timestamp": "created_at"
}
Go + pgx: контроль транзакций и конкурентный poller
Go хорош тем, что легко держать в голове контроль над транзакциями и пулы коннектов.
type OutboxEvent struct {
ID uuid.UUID
AggregateType string
AggregateID string
EventType string
Payload []byte
Headers []byte
CreatedAt time.Time
AvailableAt time.Time
Attempts int
Status string
DedupKey *string
PartitionKey *string
Seq *int64
}
func CreateUser(ctx context.Context, db *pgxpool.Pool, email string) (uuid.UUID, error) {
tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil { return uuid.Nil, err }
defer tx.Rollback(ctx)
var userID uuid.UUID
if err := tx.QueryRow(ctx, `
insert into users(email) values ($1) returning id
`, email).Scan(&userID); err != nil {
return uuid.Nil, err
}
ev := OutboxEvent{
ID: uuid.New(), AggregateType: "User",
AggregateID: userID.String(), EventType: "UserCreated",
Payload: mustJSON(struct{UserID, Email string}{userID.String(), email}),
PartitionKey: ptr(userID.String()), DedupKey: ptr("user-created:" + userID.String()),
Status: "PENDING",
}
_, err = tx.Exec(ctx, `
insert into outbox_events(id, aggregate_type, aggregate_id, event_type, payload, headers, status, dedup_key, partition_key)
values ($1,$2,$3,$4,$5,'{}','PENDING',$6,$7)
`, ev.ID, ev.AggregateType, ev.AggregateID, ev.EventType, ev.Payload, ev.DedupKey, ev.PartitionKey)
if err != nil { return uuid.Nil, err }
if err := tx.Commit(ctx); err != nil { return uuid.Nil, err }
return userID, nil
}
Диспатчер с конкурентной выборкой:
func DispatchLoop(ctx context.Context, db *pgxpool.Pool, prod KafkaProducer) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
func() {
tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil { log.Printf("tx error: %v", err); return }
defer tx.Rollback(ctx)
rows, err := tx.Query(ctx, `
select id, aggregate_id, payload, partition_key
from outbox_events
where status = 'PENDING' and available_at <= now()
order by created_at
for update skip locked
limit 100
`)
if err != nil { log.Printf("query error: %v", err); return }
idsSent := make([]uuid.UUID, 0, 100)
for rows.Next() {
var id uuid.UUID; var aggID, payload, pkey string
if err := rows.Scan(&id, &aggID, &payload, &pkey); err != nil { log.Printf("scan: %v", err); return }
if err := prod.Send("user-events", pkey, payload); err != nil {
if _, e := tx.Exec(ctx, `
update outbox_events
set attempts = attempts + 1,
available_at = now() + (power(2, attempts+1) * interval '100 milliseconds'),
status = case when attempts+1 >= 10 then 'FAILED' else status end
where id = $1
`, id); e != nil { log.Printf("retry update: %v", e) }
} else {
idsSent = append(idsSent, id)
}
}
if len(idsSent) > 0 {
_, err = tx.Exec(ctx, `
update outbox_events set status='SENT' where id = any($1)
`, idsSent)
if err != nil { log.Printf("mark sent error: %v", err); return }
}
if err := tx.Commit(ctx); err != nil { log.Printf("commit: %v", err) }
}()
}
}
}
Python 3.12 + SQLAlchemy 2.0: sync или async, разницы мало
На Python делаем то же самое. Покажу синхронный вариант для простоты.
from sqlalchemy import text
from sqlalchemy.orm import Session
import json, uuid, time
from datetime import datetime, timedelta
def create_user(session: Session, email: str):
user_id = session.execute(text("insert into users(email) values (:e) returning id"), {"e": email}).scalar_one()
payload = json.dumps({"user_id": str(user_id), "email": email})
session.execute(text("""
insert into outbox_events(id, aggregate_type, aggregate_id, event_type, payload, headers, status, dedup_key, partition_key)
values (:id, 'User', :aid, 'UserCreated', :payload, '{}'::jsonb, 'PENDING', :dk, :pk)
"""), {
"id": str(uuid.uuid4()),
"aid": str(user_id),
"payload": payload,
"dk": f"user-created:{user_id}",
"pk": str(user_id)
})
def dispatch_loop(engine, producer):
while True:
with engine.begin() as conn:
rows = conn.execute(text("""
select id, payload, coalesce(partition_key, aggregate_id) as k
from outbox_events
where status='PENDING' and available_at <= now()
order by created_at
for update skip locked
limit 100
""")).mappings().all()
sent_ids = []
for r in rows:
try:
producer.send("user-events", key=r["k"], value=r["payload"])
sent_ids.append(r["id"])
except Exception:
conn.execute(text("""
update outbox_events
set attempts = attempts + 1,
available_at = now() + (power(2, attempts+1) * interval '100 milliseconds'),
status = case when attempts+1 >= 10 then 'FAILED' else status end
where id = :id
"""), {"id": r["id"]})
if sent_ids:
conn.execute(text("update outbox_events set status='SENT' where id = any(:ids)"),
{"ids": sent_ids})
time.sleep(0.2)
Node.js 22 + Prisma: транзакция через prisma.$transaction, выборка через сырой SQL
Prisma не дает удобный API на row-level locking, поэтому используем prisma.$queryRaw
c FOR UPDATE SKIP LOCKED
.
// сохранение
await prisma.$transaction(async (tx) => {
const user = await tx.user.create({ data: { email } });
await tx.$executeRawUnsafe(`
insert into outbox_events(id, aggregate_type, aggregate_id, event_type, payload, headers, status, dedup_key, partition_key)
values (gen_random_uuid(), 'User', $1, 'UserCreated', $2, '{}'::jsonb, 'PENDING', $3, $1)
`, user.id, JSON.stringify({ userId: user.id, email }), `user-created:${user.id}`);
});
// диспатчер
async function dispatchOnce() {
return prisma.$transaction(async (tx) => {
const rows = await tx.$queryRawUnsafe(`
select id, payload, coalesce(partition_key, aggregate_id) as k
from outbox_events
where status='PENDING' and available_at <= now()
order by created_at
for update skip locked
limit 100
`);
const sent: string[] = [];
for (const r of rows as any[]) {
try {
await kafka.send({ topic: "user-events", messages: [{ key: r.k, value: r.payload }] });
sent.push(r.id);
} catch {
await tx.$executeRawUnsafe(`
update outbox_events
set attempts = attempts + 1,
available_at = now() + (power(2, attempts+1) * interval '100 milliseconds'),
status = case when attempts+1 >= 10 then 'FAILED' else status end
where id = $1
`, r.id);
}
}
if (sent.length) {
await tx.$executeRawUnsafe(`update outbox_events set status='SENT' where id = any($1)`, sent);
}
});
}
setInterval(() => dispatchOnce().catch(console.error), 200);
CDC-вариант: Debezium Outbox SMT
Если Kafka уже в контур включена, можно не писать диспатчер. Debezium будет слушать change-log БД и транслировать строки из outbox в топики.
Фрагмент конфигурации SMT:
{
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "event_type",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.timestamp": "created_at",
"transforms.outbox.table.fields.additional.placement":
"event_version:header:eventVersion,aggregate_type:header:aggregateType"
}
Конечно, тут из минусов то, что получаем доп инфраструктуру, задержки CDC и внимательное отношение к правам и нагрузке на WAL.
Финальная проверка цели
Мы собрали полный путь: модель outbox, атомарная запись, конкурентный диспатч через SKIP LOCKED, ретраи, дедуп, порядок и метрики. Показал рабочие куски кода под пять стеков и альтернативу на CDC. Этого достаточно, чтобы внедрить паттерн без сюрпризов и заняться важным — доменной логикой. Если нужно, могу дописать секцию под ваш брокер и формат событий, или развернуть примеры под конкретную СУБД и фреймворк.
Когда вы строите систему, приходится балансировать: скорость против памяти, гибкость против поддержки, простота против расширяемости. Эти компромиссы неизбежны, и именно их чаще всего недооценивают — до тех пор, пока не начинают болеть реальные проекты. Если вы хотите глубже разобраться, как архитектурные решения отражаются на производительности и качестве кода, записывайтесь на бесплатные уроки:
8 сентября в 20:00 — Архитектура сборщиков мусора: компромисс между скоростью, памятью и паузами
17 сентября в 20:00 — Практическое руководство по применению SOLID принципов
Немного практики в тему — попробуйте пройти вступительный тест по Архитектуре и шаблонам проектирования и узнаете, есть ли пробелы в знаниях.
MyraJKee
На сколько это корректно, в Golang части, вызывать Rollback безотносительно того что случилось с транзакцией?