Я прочитал статью, и меня поразило, сколько сомнительных решений можно использовать для одной простой задачи.

В этой статье я расскажу, как правильно создать сервис для конкурентных обновлений остатков данных в базе данных. Буду использовать .NET, C#, Entity Framework и Postgres.

Условия

  • В базе данных хранятся остаток товаров и количество зарезервированных товаров по каждой паре (товар, склад)

  • Инвариант системы - для всех пар (товар, склад) количество зарезервированных товаров должно быть не больше остатка

  • Должен быть метод размещения заказа (резерва), который:

    • Получает Id заказа и массив строк заказа (Id товара, Id склада, количество)

    • Данные о заказе и позициях заказа сохраняет в базе данных, обновляет резервы

    • Для каждой позиции заказа обновляет количество зарезервированных товаров = количество зарезервированных товаров + количество в заказе

    • Работает транзакционно - выполняется полностью или не выполняется вовсе, не нарушает инварианты, не влияет на другие операции и другие операции не влияют на результат, если вернул положительный ответ, то данные уже не потеряются (ACID)

  • Система не должна падать с ошибками под нагрузкой


Каркас приложения

Для создания приложения выполняю команды

dotnet new webapi
dotnet add package Microsoft.EntityFrameworkCore
dotnet add package Microsoft.EntityFrameworkCore.Design

Я буду использовать Entity Framework, так как хочу создавать схему в коде и использовать миграции для обновления.

Модель

[PrimaryKey(nameof(ItemId), nameof(WarehouseId))]
public class Stock
{
    public int ItemId { get; set; }
    public int WarehouseId { get; set; }
    public int Quantity { get; set; }
    public int Reserved { get; set; }
}

[JsonObjectCreationHandling(JsonObjectCreationHandling.Populate)]
public class Order
{
    [Key]
    public Guid Id { get; set; }
    public List<OrderLine> Lines { get; } = new List<OrderLine>();
}

[PrimaryKey(nameof(OrderId), nameof(ItemId), nameof(WarehouseId))]
public class OrderLine
{
    [JsonIgnore]
    public Guid OrderId { get; set; }
    public int ItemId { get; set; }
    public int WarehouseId { get; set; }
    public int Quantity { get; set; }
}

По умолчанию применяем минимум четвертую нормальную форму, чтобы не требовалось массовых операций при добавлении\изменении\удалении одного элемента модели.

Первичные ключи гарантируют уникальность, то есть нельзя будет дважды вставить один заказ или сделать две строки заказа с одним и тем же товаром и складом. Кроме того все базы данных при назначении ключей создают индексы по ключевым полям, что может ускорить поиск.

Класс контекста

public class StockApiDataContext(DbContextOptions<StockApiDataContext> options) : DbContext(options)
{
    public DbSet<Order> Orders { get; set; } = null!;
    public DbSet<OrderLine> OrderLines { get; set; } = null!;
    public DbSet<Stock> Stock { get; set; } = null!;

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        var tableBuilder = modelBuilder.Entity<Stock>();
        tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
    }
}

Значение по умолчанию для поля Reserved нужно только для более удобного наполнения тестовыми данными.

База данных

В качестве сервера баз данных я буду использовать docker-контейнер, запущенный локально

docker run -d \
  --name postgres \
  -p 5432:5432 \
  -e POSTGRES_PASSWORD=P@ssw0rd \
  postgres

В проект добавлю пакеты для работы с Postgres

dotnet add package Npgsql.EntityFrameworkCore.PostgreSQL
dotnet add package EFCore.NamingConventions

Последний пакет дает возможность изменить соглашение именования объектов в базе данных с PascalCase на snake_case, что позволяет писать запросы к postgres без экранирования имен.

Для создания миграций установлю инструменты

dotnet tool install --global dotnet-ef

Код приложения для подключения к базе данных и применения миграций

var builder = WebApplication.CreateBuilder(args);
// ...
builder.Services.AddDbContext<StockApiDataContext>(opt =>
    opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"))
       .UseSnakeCaseNamingConvention()
);
// ...
var app = builder.Build();
//...
using (var scope = app.Services.CreateScope())
{
    var db = scope.ServiceProvider.GetRequiredService<StockApiDataContext>();
    await db.Database.MigrateAsync();
}

Теперь можно создать миграции

dotnet ef migrations add Initial

Код сервиса

app.MapPost("/place-order", 
            async (Order order, 
                   StockApiDataContext ctx, 
                   CancellationToken ct) =>
{
    var lines = from l in order.Lines
                group l by new { l.ItemId, l.WarehouseId } into g
                select new OrderLine()
                {
                    ItemId = g.Key.ItemId,
                    WarehouseId = g.Key.WarehouseId,
                    Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
                };
    lines = lines.ToArray();

    order.Lines.Clear();
    order.Lines.AddRange(lines);
    ctx.Orders.Add(order);
    await ctx.SaveChangesAsync(ct);    
})
  • Данные сущности Order заполняются с помощью привязки значений, при этом поле Order.Lines не имеет сеттера, но указав атрибут JsonObjectCreationHandling(JsonObjectCreationHandling.Populate), можно заполнять значениями существующие объекты, а не создавать новые.

  • В запросе может прийти множество строк заказа с одинаковыми ItemId и WarehouseId, они склеиваются в одну, суммируя количество.

Код только сохраняет заказы в базу, не обновляет и не проверяет остатки. Используем его для получения бейзлайна по быстродействию.

Тест быстродействия

Использую k6 от графаны

winget install k6 --source winget

Код теста

import http from 'k6/http';

function rand(min, max) {
    return Math.round(Math.random() * (max - min) + min);
}

const itemsCount = 10;
const warehousesCount = 10;
export const options = {
    iterations: itemsCount * warehousesCount * 300,
    vus: 50,
};

export default function () {
    const url = __ENV.applicationUrl ?? 'http://localhost:5011';
    const params = {
        headers: {
            'Content-Type': 'application/json',
        },
    };

    const count = rand(1, itemsCount);
    let lines = [];
    for (let index = 0; index < count; index++) {
        lines.push({
            itemId: rand(1, itemsCount),
            warehouseId: rand(1, warehousesCount),
            quantity: rand(1, 5)
        })
    }
    const payload = JSON.stringify({ id: crypto.randomUUID(), lines: lines });


    http.post(`${url}/place-order`, payload, params);
}
  • Тест генерирует запрос в котором от 1 до 10 позиций, в которых случайные id для товаров и складов в диапазоне от 1 до 10 и от 1 до 5 единиц товара в каждой позиции.

  • Тест выполняется параллельно в 50 потоков

  • Общее количество итераций = 300 х 10 х 10 = 30 000

  • Так как количество строк заказа будет варьироваться между тестами, то и время забега будет немного отличаться, поэтому я буду делать несколько замеров и выкладывать средний.

Нагрузочный тест

Запуск приложения

dotnet run -c Release -- Logging:LogLevel:Microsoft.EntityFrameworkCore.Database.Command="Error"

По умолчанию ef core все запросы логирует в консоль, чтобы консоль не искажала результаты замера быстродействие я оставил вывод в консоль только ошибок.

Перед каждым забегом я сбрасываю состояние базы данных sql-скриптом

TRUNCATE TABLE orders CASCADE;
TRUNCATE TABLE stock;
INSERT INTO stock 
select *, random(10000, 100000),0 as quantity from generate_series(1,10) item_id, generate_series(1,10) warehouse_id;

Запуск теста (в другом терминале)

k6 run k6test.js

Результат

HTTP
http_req_duration..............: avg=16.03ms min=4.85ms med=14.86ms max=385.25ms p(90)=20.7ms  p(95)=23.46ms
  { expected_response:true }...: avg=16.03ms min=4.85ms med=14.86ms max=385.25ms p(90)=20.7ms  p(95)=23.46ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  3074.002522/s

Это бейзлайн, относительно которого я буду оценивать быстродействие.

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/initial


Обновление остатков

Добавлю логику обновления остатков на складах, пока без проверки

app.MapPost("/place-order/", 
            async (Order order, 
                   StockApiDataContext ctx, 
                   CancellationToken ct) =>
{
    var lines = from l in order.Lines
                group l by new { l.ItemId, l.WarehouseId } into g
                select new OrderLine()
                {
                    ItemId = g.Key.ItemId,
                    WarehouseId = g.Key.WarehouseId,
                    Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
                };
    lines = lines.ToArray();

    order.Lines.Clear();
    order.Lines.AddRange(lines);

    await using var t = await ctx.Database.BeginTransactionAsync(ct);
    ctx.Orders.Add(order);
    await ctx.SaveChangesAsync(ct);
    foreach (var l in order.Lines)
    {
        await ctx.Stock
                 .Where(s => s.ItemId == l.ItemId 
                        && s.WarehouseId == l.WarehouseId)
                 .ExecuteUpdateAsync(setter => 
                                     setter.SetProperty(s => s.Reserved, 
                                                        s => s.Reserved + l.Quantity),
                                     ct);
    }
    await t.CommitAsync(ct);

})

И сразу же получаем взаимоблокировки (deadlock) при нагрузочном тесте и сыпятся ошибки в консоли.

Почему происходит deadlock

  • База данных имеет по умолчанию уровень изоляции read committed, то есть не может прочитать данные, которые были изменены, но еще не закоммичены другой транзакцией.

  • Для этого Postgres (и другие СУБД) накладывает блокировку на запись строки и снимает её только в конце транзакции.

  • Когда параллельных транзакций больше одной и они меняют больше одной строки, то может случиться так:

    • Транзакция А успела заблокировать строку 1

    • Транзакция Б успела заблокировать строку 2

    • Транзакция А повисла в ожидании снятия блокировки строки 2

    • Транзакция Б повисла в ожидании снятия блокировки строки 1

  • Ситуация, описанная выше, называется взаимоблокировка (deadlock). Postgres (и другие СУБД) определяет, что появилась такая блокировка и отменяет одну из заблокированных транзакций, чтобы другие могли выполняться.

Как бороться со взаимоблокировками

Простое правило:

Все транзакции должны накладывать блокировки в одном и том же порядке

Для этого просто отсортируем позиции по Id товара и склада

// ...
foreach (var l in order.Lines.OrderBy(l => l.ItemId).ThenBy(l => l.WarehouseId))
// ...

Результаты забега

HTTP
http_req_duration..............: avg=92.86ms min=4.45ms med=23.4ms  max=6.03s p(90)=186.04ms p(95)=390.5ms 
  { expected_response:true }...: avg=92.86ms min=4.45ms med=23.4ms  max=6.03s p(90)=186.04ms p(95)=390.5ms 
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  536.144972/s

Из-за ожиданий блокировок некоторые транзакции висят по несколько секунд. Пропускная способность упала почти в 6 раз.

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-update-loop


Обновление остатков одним запросом

Предыдущий вариант плох тем, что мы очень много раз бегаем в базу, отправляя отдельные запросы на обновление остатков, хотя саму логику обновления мы можем сделать одним запросом

app.MapPost("/place-order/", 
            async (Order order, 
                   StockApiDataContext ctx, 
                   CancellationToken ct) =>
{
    var lines = from l in order.Lines
                group l by new { l.ItemId, l.WarehouseId } into g
                select new OrderLine()
                {
                    ItemId = g.Key.ItemId,
                    WarehouseId = g.Key.WarehouseId,
                    Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
                };
    lines = lines.ToArray();

    order.Lines.Clear();
    order.Lines.AddRange(lines);

    await using var t = await ctx.Database.BeginTransactionAsync(ct);
    ctx.Orders.Add(order);
    await ctx.SaveChangesAsync(ct);

    var q = from l in ctx.OrderLines
            where l.OrderId == order.Id
            join s in ctx.Stock
            on new { l.ItemId, l.WarehouseId } 
              equals new { s.ItemId, s.WarehouseId }
            select new { s, l };
    await q.ExecuteUpdateAsync(setter => 
                               setter.SetProperty(x => x.s.Reserved, 
                                                  x => x.s.Reserved + x.l.Quantity),
                               ct);
    await t.CommitAsync(ct);
    
})

К сожалению такой код будет вызывать deadlock, так как Postgres не гарантирует в каком порядке будут накладываться блокировки на строки. Это зависит от порядка записей на диске, а порядок записей на диске зависит от предыдущих операций обновления.

Результаты забега при появлении deadlock

HTTP
http_req_duration..............: avg=87.22ms min=4.28ms med=19.75ms max=13.26s p(90)=160.89ms p(95)=326.21ms
  { expected_response:true }...: avg=86.34ms min=4.28ms med=19.74ms max=13.26s p(90)=160.48ms p(95)=321.98ms
http_req_failed................: 0.05%  15 out of 30000
http_reqs......................: 30000  571.569136/s

Результаты забега когда взаимоблокировки не возникают

HTTP
http_req_duration..............: avg=69.83ms min=4.15ms med=19.94ms max=5.32s p(90)=155.74ms p(95)=280.21ms
  { expected_response:true }...: avg=69.83ms min=4.15ms med=19.94ms max=5.32s p(90)=155.74ms p(95)=280.21ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  713.321511/s

Это значительно быстрее, чем обновление остатков с цикле, но нужно как-то бороться с deadlock_ами.

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-check-query


Что делать с deadlock

Если вы не можете обеспечить одинаковый порядок блокирования ресурсов, то в общем случае есть две стратегии: выбор более строгого режима изоляции транзакций и увеличение гранулярности блокировок.

Но в Postgres увеличение уровня изоляции до repeatable read или serializable приведет к тому, что ошибок станет еще больше, так как postgres откатывает транзакции при нарушении изоляции.

Увеличение гранулярности может помочь, но:

  • Это приведет к уменьшению параллельности запросов, вплоть до полостью последовательного выполнения, что снизит пропускную способность

  • Потребует руками писать SQL-код, так как у EF Core нет методов для ручного управления блокировками

Поэтому лучше смириться с тем, что часть запросов будут отваливаться и просто повторять их. Тем более в EF Core встроен механизм отказоустойчивости, он автоматически повторяет запросы, если считает что ошибка некритична.

Такой механизм полезен не только для Postgres, но и других баз данных, так как они тоже могут выдавать ошибки взаимоблокировок и другие временные (transient) ошибки.


Повторение запросов

Для повторения запросов надо это повторение включить

// ...
builder.Services.AddDbContext<StockApiDataContext>(opt =>
    opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"),
        npgOptions => npgOptions.EnableRetryOnFailure()) // включить повторение запросов
       .UseSnakeCaseNamingConvention()
);
// ...

А также нужно использовать "стратегию" в коде, чтобы группа запросов повторялась в одной транзакции

app.MapPost("/place-order/", 
            async (Order order, 
                   StockApiDataContext ctx, 
                   CancellationToken ct) =>
{
    var lines = from l in order.Lines
                group l by new { l.ItemId, l.WarehouseId } into g
                orderby g.Key.ItemId, g.Key.WarehouseId // добавлено
                select new OrderLine()
                {
                    ItemId = g.Key.ItemId,
                    WarehouseId = g.Key.WarehouseId,
                    Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
                };
    lines = lines.ToArray();

    order.Lines.Clear();
    order.Lines.AddRange(lines);

    var db = ctx.Database;
    await db.CreateExecutionStrategy().ExecuteInTransactionAsync(async ct =>
    {
        ctx.Orders.Add(order);
        await ctx.SaveChangesAsync(ct);
        var q = from l in ctx.OrderLines
                where l.OrderId == order.Id
                join s in ctx.Stock
                on new { l.ItemId, l.WarehouseId } 
                  equals new { s.ItemId, s.WarehouseId }
                select new { s, l };
        await q.ExecuteUpdateAsync(setter => 
                                   setter.SetProperty(x => x.s.Reserved, 
                                                      x => x.s.Reserved + x.l.Quantity),
                                   ct);        
    }, ct => Task.FromResult(false), ct);
    
})

Также добавил сортировку строк по ItemId и WarehouseId перед добавлением в базу данных, чтобы сократить количество взаимоблокировок.

Результаты забега

HTTP
http_req_duration..............: avg=74.72ms min=4.56ms med=19.61ms max=6.32s p(90)=146.08ms p(95)=284.93ms
  { expected_response:true }...: avg=74.72ms min=4.56ms med=19.61ms max=6.32s p(90)=146.08ms p(95)=284.93ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  666.887768/s

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-update-query-retry


Проверка остатков

Для этого достаточно добавить одну строку в код выше

await q.ExecuteUpdateAsync(/* прощено */);

// Проверка остатков
if (await q.AnyAsync(x => 
                     x.s.Quantity < x.s.Reserved, 
                     ct)) throw new Exception("Oversell");

Так как предыдущий код, обновляющий остатки, уже навешивает блокировки на все строки, то проверка этих же строк будет изолирована от других транзакций.

Если инвариант будет нарушен, но EF Core не будет повторять запрос, так как такая ошибка не считается "временной".

Результаты забега

HTTP
http_req_duration..............: avg=79.47ms min=4.68ms med=21.88ms max=4.36s p(90)=168.33ms p(95)=320.29ms
  { expected_response:true }...: avg=79.47ms min=4.68ms med=21.88ms max=4.36s p(90)=168.33ms p(95)=320.29ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  627.168904/s

Нагрузочный тест делается в условиях когда запасы заведомо превышают все резервы заказов, так как ошибки сильно влияют на время выполнения запросов.

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-check-query

Итого

  • Вся необходимая логика реализована, без ручных блокировок и на уровне изоляции read committed

  • В каждом запросе среднем 5 позиций заказа, получаем (627.168904*5)=3 135.84452 обновлений остатков в секунду

  • В тесте всего 10 разных товаров, на один товар приходится (3 135.84452/10)=313.584452, округленно 314 обновлений остатков в секунду

  • Это на домашнем компе, с Postgres в docker в wsl

  • Во время теста показатели скорости записи на диск не превышали 5 мб\сек

Мне кажется что 600 заказов в секунду и 300 резервов на один товар это вполне достаточная производительность для любого маркетплейса.


Логика в базе данных

По житейской логике заказ всегда должен создаваться вместе с резервом товаров по этому заказу. Поэтому нет необходимости каждый раз из приложения отправлять одни и те же запросы на обновление и контроль остатков. В базе данных есть средства делать это автоматически.

На этом месте общественность может возмутиться: "как же так, бизнес-логика в базе данных это плохо". Но плохо это, если бизнес-логика в базе данных обновляется и проверяется независимо от приложения. Тогда очень легко допустить несогласованные изменения.

К счастью для EF есть расширение, которое позволяет делать триггеры в базе данных на C#

dotnet add package Laraue.EfCoreTriggers.PostgreSql

Нужно добавить расширение к контексту

builder.Services.AddDbContext<StockApiDataContext>(opt =>
    opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"),
            npgOptions => npgOptions.EnableRetryOnFailure())
       .UseSnakeCaseNamingConvention()
       .UsePostgreSqlTriggers() // добавлено
);

И добавить необходимые триггеры и проверки в модель

// Изменения в классе StockApiDataContext, остальной код не меняется
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    var tableBuilder = modelBuilder.Entity<Stock>();
    var reservedProp = tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
    var quantityProp = tableBuilder.Property(s => s.Quantity);


    tableBuilder.ToTable(t =>
        t.HasCheckConstraint("check_stock", $"{quantityProp.Metadata.GetColumnName()} >= {reservedProp.Metadata.GetColumnName()}"));

    modelBuilder.Entity<OrderLine>()
                .AfterInsert(t =>
                    t.Action(a =>
                        a.Update<Stock>(
                            (l, s) => s.ItemId == l.New.ItemId 
                                      && s.WarehouseId == l.New.WarehouseId,
                            (l, s) => new Stock { 
                              Reserved = s.Reserved + l.New.Quantity }
                        )
                    )
                );

}

Такой код создает триггер, который на каждое добавление OrderLine вызывает обновление таблицы Stock. Контроль остатков выполняется обычным check constraint.

Код метода при этом упрощается

app.MapPost("/place-order/", async (Order order, 
                                    StockApiDataContext ctx, 
                                    CancellationToken ct) =>
{
    var lines = from l in order.Lines
                group l by new { l.ItemId, l.WarehouseId } into g
                orderby g.Key.ItemId, g.Key.WarehouseId
                select new OrderLine()
                {
                    ItemId = g.Key.ItemId,
                    WarehouseId = g.Key.WarehouseId,
                    Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
                };
    lines = lines.ToArray();

    order.Lines.Clear();
    order.Lines.AddRange(lines);
    ctx.Orders.Add(order);
    await ctx.SaveChangesAsync(ct);    
})

Результаты забега

HTTP
http_req_duration..............: avg=74.77ms min=2.66ms med=17.56ms max=5.57s p(90)=139.43ms p(95)=291.77ms
  { expected_response:true }...: avg=74.77ms min=2.66ms med=17.56ms max=5.57s p(90)=139.43ms p(95)=291.77ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  666.194313/s

Я подошел к границе Парето-эффективности

  • Выигрыш по сравнению с предыдущим вариантом 7-8%

  • Обновление логики триггеров теперь требует миграций, даже во время написания кода я потратил заметно больше времени, чем в варианте с запросами

  • Код все еще универсальный, для смены провайдера надо будет поменять только пакеты регистрацию в контейнере

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/trigger-and-constraint


Увеличиваем нагрузку

В k6 тесте поменяю количество виртуальных пользователей с 50 до 100

export const options = {
    iterations: itemsCount * warehousesCount * 300,
    vus: 100, // было 50
};

В результате получаю

HTTP
http_req_duration..............: avg=165.63ms min=3.03ms med=27.67ms max=57.48s p(90)=197.06ms p(95)=453.54ms
  { expected_response:true }...: avg=165.63ms min=3.03ms med=27.67ms max=57.48s p(90)=197.06ms p(95)=453.54ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  489.284481/s

В логах Postgres появляется много сообщений `FATAL: sorry, too many clients already`. Повторение запросов Entity Framework помогает и тут, повторяя запросы пока они не выполнятся. Но если увеличить количество vus до 150, то запросы начинают отваливаться по ошибке. По-умолчанию механизм повторений EF Core пытается повторить запрос 5 раз.

В Postgres по умолчанию max_connections=100. Это число можно увеличить, но с ростом количества соединений увеличивается и потребление памяти. Теоретически верхняя граница количества подключений равна максимальному количеству открытых портов, что порядка 65 тысяч. Однако, если доступный объем памяти будет исчерпан, операционная система завершит работу Postgres. Таким образом, всегда существует предел, который может быть превышен при высокой нагрузке.

Для Microsoft SQL Server такой проблемы нет. Для Postgres можно настроить Maximum Pool Size, чтобы подключений в пуле было меньше, чем свободных подключений в postgres. Тогда попытка получить еще одно подключение зависнет в ожидании на Timeout, который по умолчанию 15 сек.

При Maximum Pool Size=80 и vus: 100 результаты такие

HTTP
http_req_duration..............: avg=223.56ms min=32.59ms med=127.14ms max=13.24s p(90)=266.08ms p(95)=470.36ms
  { expected_response:true }...: avg=223.56ms min=32.59ms med=127.14ms max=13.24s p(90)=266.08ms p(95)=470.36ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  666.724807/s

Поэтому далее идут оптимизации, которые вы почти никогда не будете применять на практике.


Очередь запросов к базе данных

Первое правило создания очередей: не создавай очередей (с) Джейсон Стэйтем

Очередь чтения и записи на диск, блокировки в базе данных, механизм повторов, пул соединений, ThreadPool в .net - это все уже очереди, возможно стоит настроить их параметры и не заниматься созданием своих очередей. Поищите альтернативные решения, прежде чем изобретать свою очередь.

Для начала напишу такой код, как хочу получить


app.MapPost("/place-order/", 
            async (Order order, 
                   DbQueueService<StockApiDataContext> worker, // вместо StockApiDataContext ctx
                   CancellationToken ct) =>
{
    var lines = from l in order.Lines
                group l by new { l.ItemId, l.WarehouseId } into g
                orderby g.Key.ItemId, g.Key.WarehouseId
                select new OrderLine()
                {
                    ItemId = g.Key.ItemId,
                    WarehouseId = g.Key.WarehouseId,
                    Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
                };
    lines = lines.ToArray();

    order.Lines.Clear();
    order.Lines.AddRange(lines);

    await worker.ExecuteAsync(async (ctx, ct) => // добавлено
    {
        ctx.Orders.Add(order);
        await ctx.SaveChangesAsync(ct);
    }, ct);    
})

Чтобы такой код заработал мне необходимо зарегистрировать класс в контейнере

builder.Services.Configure<DbQueueServiceOptions>(o => { });
builder.Services.AddSingleton<DbQueueService<StockApiDataContext>>();
// регистрация синглтона в качестве Hosted Service
builder.Services.AddHostedService(sp => 
            sp.GetRequiredService<DbQueueService<StockApiDataContext>>());

Сам класс очереди на базе BackgroundService

internal class DbQueueService<TContext>(
  IOptions<DbQueueServiceOptions> options, 
  IServiceProvider sp) : BackgroundService where TContext : DbContext
{
    private readonly record struct QueueItem(
        TaskCompletionSource Source, 
        Func<TContext, CancellationToken, Task> Action, 
        CancellationToken CancellationToken);

    private Channel<QueueItem> channel = Channel.CreateUnbounded<QueueItem>(new() { SingleReader = true });

    public async Task ExecuteAsync(
        Func<TContext, CancellationToken, Task> action, 
        CancellationToken ct = default)
    {
        ArgumentNullException.ThrowIfNull(action);
        TaskCompletionSource tcs = new();
        await channel.Writer.WriteAsync(new(tcs, action, ct), ct);
        await tcs.Task;
    }

    // From BackgroundService
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var reader = channel.Reader;

        while (await reader.WaitToReadAsync(stoppingToken))
        {
            List<QueueItem> batch = new();
            while (batch.Count < options.Value.MaxItemsInBatch
                    && reader.TryRead(out var item)) batch.Add(item);

            await ProcessBatch(sp, batch, stoppingToken);
        }
    }

    private async Task ProcessBatch(
        IServiceProvider sp, 
        IEnumerable<QueueItem> batch, 
        CancellationToken stoppingToken)
    {
        await using var scope = sp.CreateAsyncScope();
        await using var ctx = scope.ServiceProvider.GetRequiredService<TContext>();
        foreach (var item in batch)
        {
            if (stoppingToken.IsCancellationRequested)
            {
                item.Source.SetCanceled(stoppingToken);
                continue;
            }

            var result = item.Action(ctx, item.CancellationToken);
            await result.WaitAsync(item.CancellationToken)
                        .ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
            item.Source.SetFromTask(result);
        }
    }
}

internal class DbQueueServiceOptions
{
    public int MaxItemsInBatch { get; set; } = 50;
} 
  • Для организации передачи от вызывающего кода к обработчику очереди используется System.Threading.Channels.Channel, в принципе нет ни одного разумного аргумента не использовать эти классы для любых очередей

  • В качестве элемента очереди используется record struct, чтобы сократить количество аллокаций

  • А для передачи результата вызывающему коду используется TaskCompletionSource

  • public ExecuteAsync отправляет элемент в канал и возвращает Task

  • protected override ExecuteAsync ждет когда в канале будет хотя бы один элемент, получает из канала все что есть и отправляет весь батч на обработку.

  • ProcessBatch получает батч, создает контекст EF и в цикле выполняет функции

  • Все выполняется в одном потоке, без конкурентности запросов

Результаты забега (для vus: 100)

HTTP
http_req_duration..............: avg=493.12ms min=8.26ms med=496.76ms max=676.42ms p(90)=510.17ms p(95)=514.03ms
  { expected_response:true }...: avg=493.12ms min=8.26ms med=496.76ms max=676.42ms p(90)=510.17ms p(95)=514.03ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  202.400329/s

Пропускная способность не зависит от количества виртуальных пользователей, но время обработки растет. Запросы приходят быстрее, чем удается их обрабатывать в один поток.

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/write-queue


Параллельная обработка батчей

Предыдущий вариант оказался очень медленным из-за обработки в одном потоке. Чтобы сделать параллельную обработку нескольких батчей достаточно поменять один метод класса очереди

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    List<Task> tasks = new();

    var reader = channel.Reader;
    while (await reader.WaitToReadAsync(stoppingToken))
    {
        List<QueueItem> batch = new();

        if (tasks.Count >= options.Value.MaxConcurrentBatches) 
            await Task.WhenAny(tasks.ToArray());

        while (batch.Count < options.Value.MaxItemsInBatch
                && reader.TryRead(out var item)) batch.Add(item);

        tasks.RemoveAll(t => t.IsCompleted);
        tasks.Add(ProcessBatch(sp, batch, stoppingToken));
    }
    await Task.WhenAny(tasks.ToArray(), stoppingToken);
}

Теперь код не ожидает завершения обработки батча, а записывает задачу в List и продолжает цикл. И только если в List уже достаточно много задач, то мы дожидается завершения любой из них, а уже потом запускает новую.

Будет запускаться параллельно столько батчей, сколько указано в опции MaxConcurrentBatches.

В рамках теста я вычислил оптимальный параметр по умолчанию для моей конфигурации

internal class DbQueueServiceOptions
{
    public int MaxItemsInBatch { get; set; } = 50;
    public int MaxConcurrentBatches { get; set; } = 15;
} 

Результаты забега (для vus: 100)

HTTP
http_req_duration..............: avg=136.85ms min=3.42ms med=98.93ms max=924.03ms p(90)=303.1ms  p(95)=382.81ms
  { expected_response:true }...: avg=136.85ms min=3.42ms med=98.93ms max=924.03ms p(90)=303.1ms  p(95)=382.81ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  728.288949/s

Это еще на 9% лучше, чем результат без очереди. Кроме того снижает количество использованных соединений. В реальности, кроме запросов на создание резерва, будут еще запросы на получение остатков и они будут гораздо меньше упираться в количество соединений.

Но еще раз повторю, что в большинстве случаев вам это не нужно.

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/write-queue-with-parallel


Оптимизация хранения

То что будет дальше вам никогда не понадобится в продуктивных приложениях, даже если вы работает в большом маркетплейсе. Пример только для демонстрации возможностей, особенностей работы и расширения кругозора.

Для дальнейшей оптимизации можно сохранять данные о строках заказа в самом заказе в виде массивов.

Модель будет выглядеть так

[PrimaryKey(nameof(ItemId), nameof(WarehouseId))]
public class Stock
{
    public int ItemId { get; set; }
    public int WarehouseId { get; set; }
    public int Quantity { get; set; }
    public int Reserved { get; set; }
}

public class Order
{
    [Key]
    public Guid Id { get; set; }
    public List<int> ItemIds { get; set; } = [];
    public List<int> WarehouseIds { get; set; } = [];
    public List<int> Quantities { get; set; } = [];
}

А код сервиса так

app.MapPost("/place-order/", async (OrderModel order, 
                                    DbQueueService<StockApiDataContext> worker, 
                                    CancellationToken ct) =>
{
    var lines = from l in order.Lines
                group l by new { l.ItemId, l.WarehouseId } into g
                orderby g.Key.ItemId, g.Key.WarehouseId
                select new
                {
                    g.Key.ItemId,
                    g.Key.WarehouseId,
                    Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
                };

    Order dbOrder = new() { Id = order.Id };
    foreach (var l in lines)
    {
        dbOrder.ItemIds.Add(l.ItemId);
        dbOrder.WarehouseIds.Add(l.WarehouseId);
        dbOrder.Quantities.Add(l.Quantity);
    }
    await worker.ExecuteAsync(async (ctx, ct) =>
    {
        ctx.Orders.Add(dbOrder);
        await ctx.SaveChangesAsync(ct);
    }, ct);    

})

Для входных данных отдельная модель

public record OrderModel(Guid Id, ICollection<OrderLineModel> Lines);
public record OrderLineModel(int ItemId, int WarehouseId, int Quantity);

Самое важное - триггер, который должен обновить остатки. Так как невозможно такой триггер написать на C#, то придется сделать это на pgSQL

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    var tableBuilder = modelBuilder.Entity<Stock>();
    var reservedProp = tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
    var quantityProp = tableBuilder.Property(s => s.Quantity);


    tableBuilder.ToTable(t =>
        t.HasCheckConstraint("check_stock", $"{quantityProp.Metadata.GetColumnName()} >= {reservedProp.Metadata.GetColumnName()}"));

        
    modelBuilder.Entity<Order>()
                .AfterInsert(t =>
                    t.Action(a =>
                        a.ExecuteRawSql("""
                            UPDATE stock
                            SET reserved = reserved + l.q
                            FROM unnest({0},{1},{2}) AS l(i,w,q)
                            WHERE (item_id,warehouse_id) = (l.i,l.w);
                        """,
                        tableRef => tableRef.New.ItemIds,
                        tableRef => tableRef.New.WarehouseIds,
                        tableRef => tableRef.New.Quantities
                        )
                    )
                );
}

Ключевое - функция unnest, которая преобразует массивы в строки.

Но такой код, к сожалению, теряет обновления остатков.

Если после теста выполнить запрос, который разворачивает все массивы из заказов и группирует по item_id и warehouse_id и сравним с количеством зарезервированных товаров в таблице stock, то такой запрос вернет много несовпадений

select s.*, t.r 
from stock s
join (
    select l.i, l.w, SUM(l.q) as r
    from orders o, 
        lateral unnest(o.item_ids,o.warehouse_ids,o.quantities) as l(i,w,q)
    group by 1,2) t on (s.item_id,s.warehouse_id) = (t.i,t.w)
where s.reserved <> t.r

Я не смог победить эту проблему даже ручными блокировками. С запросом ниже все равно появляются потерянные обновления.

  WITH l AS (
      SELECT s.ctid, l.q
      FROM stock s 
      JOIN unnest({0},{1},{2}) AS l(i,w,q)
          on (s.item_id,s.warehouse_id) = (l.i,l.w)
      FOR NO KEY UPDATE
  )
  UPDATE stock s
  SET reserved = reserved + l.q
  FROM l
  WHERE s.ctid = l.ctid;

Если установить MaxConcurrentBatches = 1, то проблема пропадает, но и скорость становится неприемлемо низкой.

Также проблему решает поднятие уровня изоляции до repeatable read, но это вызывает огромное количество конфликтов и скорость также падает.

У меня есть подозрение, что это какой-то баг Postgres, но у меня не хватает знаний ни подтвердить, ни опровергнуть эту гипотезу.

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/store-lines-in-array


Обновление остатков в триггере в цикле

В одном из постов на StackOverflow (к сожалению не сохранил ссылку) увидел совет использовать цикл в триггере.

FOR x IN SELECT l.* 
    FROM UNNEST(NEW.item_ids,NEW.warehouse_ids,NEW.quantities) as l(i,w,q)
LOOP
    UPDATE stock s
        SET reserved = reserved + x.q
    WHERE (s.item_id,s.warehouse_id) = (x.i,x.w);
END LOOP;

Такой триггер решает проблему без уменьшения параллелизма и без повышения уровня изоляции. Но этот триггер требует объявить переменную x в функции триггера, а используемое расширение для EF не дает такой возможности.

Но никто не мешает непосредственно в коде миграции исправить триггер

  migrationBuilder.Sql("""
    CREATE FUNCTION "LC_TRIGGER_AFTER_INSERT_ORDER"() RETURNS trigger as $LC_TRIGGER_AFTER_INSERT_ORDER$
        DECLARE x RECORD;
    BEGIN
        FOR x IN SELECT l.* 
                FROM UNNEST(NEW.item_ids,NEW.warehouse_ids,NEW.quantities) as l(i,w,q)
        LOOP
            UPDATE stock s
                SET reserved = reserved + x.q
            WHERE (s.item_id,s.warehouse_id) = (x.i,x.w);
        END LOOP;              
        RETURN NEW;
    END;
    $LC_TRIGGER_AFTER_INSERT_ORDER$ LANGUAGE plpgsql;
    CREATE TRIGGER LC_TRIGGER_AFTER_INSERT_ORDER AFTER INSERT
    ON "orders"
    FOR EACH ROW EXECUTE PROCEDURE "LC_TRIGGER_AFTER_INSERT_ORDER"();
    """);

Теперь все работает без конфликтов и аномалий

Результаты забега (для vus: 100)

HTTP
http_req_duration..............: avg=133.57ms min=2.87ms med=97.05ms max=949.55ms p(90)=296.95ms p(95)=378.7ms 
  { expected_response:true }...: avg=133.57ms min=2.87ms med=97.05ms max=949.55ms p(90)=296.95ms p(95)=378.7ms 
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  746.43081/s

Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/trigger-with-loop


Идемпотентность

Все предыдущие версии кода будут возвращать ошибку если попытаться создать еще один заказ с тем же Id. Теперь я сделаю код идемпотентным, как в API сервиса, так и на уровне базы данных

app.MapPut("/place-order/{id}", async (Guid id, ICollection<OrderLineModel> lines, 
                                       DbQueueService<StockApiDataContext> worker, 
                                       CancellationToken ct) =>
{
    var q = from l in lines
            group l by new { l.ItemId, l.WarehouseId } into g
            orderby g.Key.ItemId, g.Key.WarehouseId
            select new
            {
                g.Key.ItemId,
                g.Key.WarehouseId,
                Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
            };

    List<int> itemIds = [], warehouseIds = [], quantities = [];
    foreach (var l in q)
    {
        itemIds.Add(l.ItemId);
        warehouseIds.Add(l.WarehouseId);
        quantities.Add(l.Quantity);
    }
    await worker.ExecuteAsync(async (ctx, ct) =>
    {
        var db = ctx.Database;
        await db.CreateExecutionStrategy().ExecuteInTransactionAsync(async ct =>
        {
            await db.ExecuteSqlAsync($"""
                INSERT INTO orders 
                VALUES({id},{itemIds},{warehouseIds},{quantities}) 
                ON CONFLICT (id) DO NOTHING
                """, ct);
        }, ct => Task.FromResult(false), ct);
    }, ct);  
})
  • PUT вместо POST, что сообщает потребителю о том, что метод идемпотентен

  • В url для PUT необходимо указать Id

  • Для вставки строки в БД я использую запрос, который просто ничего не делает, если Id заказа в базе уже есть

Результаты забега (для vus: 100)

HTTP
http_req_duration..............: avg=132ms    min=3.3ms med=94.79ms max=1.21s p(90)=292.3ms  p(95)=378.37ms
  { expected_response:true }...: avg=132ms    min=3.3ms med=94.79ms max=1.21s p(90)=292.3ms  p(95)=378.37ms
http_req_failed................: 0.00%  0 out of 30000
http_reqs......................: 30000  755.053098/s

Пропускная способность на 13% выше варианта без очереди и postgres-специфичных оптимизаций, и всего на 3.7% выше варианта без оптимизаций.

Код смотреть тут

Выводы

  • Задача обновления остатков решается на уровне изоляции read committed без блокировок высокой гранулярности.

  • Закон Парето для оптимизации кода выполняется - 80% достижимого быстродействия достигается при 20% усилий, оставшиеся 20% результата требуют 80% усилий. Главное знать куда прикладывать усилия.

  • Дедлоки и прочие временные ошибки не всегда являются проблемой, иногда они являются частью нормальной работы. Просто включите повторение запросов.

  • Стоит использовать Entity Framework как для управления схемой базы данных, так и для реализации запросов. Можно не использовать change tracker, а использовать EF как генератор команд и интерфейс работы с базой.

Репозиторий со всеми коммитами тут

PS. Я бы остановился на варианте с логикой в базе данных и не стал бы прикручивать очередь, ограничился настройкой параметров пула и таймаутов.

Комментарии (31)


  1. alexfilus
    13.10.2025 09:38

    Для избегания дедлоков ещё есть трюк с сабселектом примерно такого вида

    UPDATE table_name
    SET ...
    WHERE id in (
      SELECT id 
      FROM table_name
      WHERE ...
      ORDER BY id 
      FOR UPDATE 
    )


    тогда блокировки будут браться именно в порядке сортировки


    1. gandjustas Автор
      13.10.2025 09:38

      Увы даже такой вариант у меня теряет обновления при использовании UNNEST. Я все что можно перепробовал.

      А во всех остальных случаях код с ORDER BY ... FOR UPDATE работает не быстрее триггеров. Но FOR UPDATE нельзя написать на EF, только SQL.


      1. alexfilus
        13.10.2025 09:38

        Именно в сочетании с unnest? У нас такое в проде несколько лет уже крутится, проблем не замечено.


        1. gandjustas Автор
          13.10.2025 09:38

          Именно с ним, без repeateable read получаю потерю обновлений при конкурентных обновлениях.


  1. Shadow_ru
    13.10.2025 09:38

    Попытка впихнуть весь поток изменений товаров в одну БД это конечно страшно. Если мы точно знаем, что у нас на складах в разы больше товаров чем дневной заказ все что описано выше ведёт к затратам ресурсов совершенно бессмысленным. Гораздо дешевле разгребать очередь в фоне мелкими воркерами


    1. onets
      13.10.2025 09:38

      В том то и дело - мы точно не знаем. Можно усложнить в стиле озона - сделать батч и перед выполнением посчитать и сравнить кол-ва на складе и во всех заказах. И если точно хватает, то можно и без блокировок.


      1. gandjustas Автор
        13.10.2025 09:38

        Проблема только в том, что между сравнением и записью "точно хватает" может превратиться в "точно не хватает".


    1. gandjustas Автор
      13.10.2025 09:38

      Попытка впихнуть весь поток изменений товаров в одну БД это конечно страшно.

      А что страшного? Нагрузка на железо небольшая, упирается все в блокировки.

      Если мы точно знаем, что у нас на складах в разы больше товаров чем дневной заказ все что описано выше ведёт к затратам ресурсов совершенно бессмысленным

      Какие траты вы называете бессмысленными?

      • Обновление резервов? Его в любом случае надо делать и в любом случае оно будет тормозить

      • Проверка остатков? Это же простое сравнение, если его отключить, то разница незаметна

      • Это я в рамках теста знаю, что все запросы выполнятся успешно, а в жизни вы такого не можете гарантировать. Код корректно отработает в любом случае.

      Гораздо дешевле разгребать очередь в фоне мелкими воркерами

      Если если вдруг окажется, что резерв в очереди не может быть сделан - не осталось запасов на складе, тогда что? А вы уже деньги у клиента списали и он ждет что его заказ скоро приедет.

      Непонятно что вы сэконосить пытаетесь, если вы сделаете очередь и все запросы последовательно будете обрабатывать, то у вас пропускная способность (от вызова метода клиента до бронирования товара на складе) упадет раза в 4 по сравнению с параллельным случаем. Вы сможете с меньшими ресурсами принимать заказы, но без обновления остатков какой в этом смысл?


      1. Shadow_ru
        13.10.2025 09:38

        Нагрузка на железо небольшая, упирается все в блокировки.

        Нет, конечно. Вы загнали систему в пик производительности, и когда 28 декабря норот ринется делать пиковые закупы вечером и и утром все это заклинит, а разгребать будет вторая линия поддержки из пару человек. И не надо писать, что надо заранее железо увеличить, не поможет, надо будет шардировать, отчего вся опитимизация пойдет погулять

        Какие траты вы называете бессмысленными

        Абсолютно все. Зачем мне обновлять резерв прямо сразу, если у меня может быть страховой остаток и мне надо только не давать за некоторое время превысить его.

        Вместо этого делаются жесткие вставки, причем в реляционную БД, и разумеется для ускорения все это будет дерномализовано потому что форейн ключи в шардах будут работать не так что бы быстро.

        Проверки хз зачем, причем непонятно как это будет работать загнаннное в десяток подов, раскиданных по датацентрам.

        не осталось запасов на складе, тогда что? А вы уже деньги у клиента списали

        Значит заказ отменится. Цель - что бы такое было гораздо реже чем покупка. Вы видно в торговле работали примерно никогда, товар может не приехать по такому количеству причин, что "запасы на складе по базе" это такая мизерная часть, что возиться с кровью из глаз что бы вот точно-точно совпало с цыферкой на складе - это наивность. Потому что с утра жахнули при разгрузке два холодоса об асфальт, а списать еще не списали. Защитой является тот самый страховой остаток, поэтому все эти приседания - бессмыслены и даже вредны.

         если вы сделаете очередь и все запросы последовательно будете обрабатывать

        Ну что мешает разгребать ее параллельно, группируя по типам товаров (да у разных групп будет разная логика обработки остатков, вы же учли в своей логике мерные товары и продажу упаковками, да?). Да и последовательно для некоторых товарцев можно, часто два пика закупа бывает- утро и вечер, остальное время стоит, зачем мне железо купленное под пиковую производительность ? Купим пожиже и за ночь разгребем


        1. gandjustas Автор
          13.10.2025 09:38

          Нет, конечно.

          Это вы о чем?

          Вы загнали систему в пик производительности, и когда 28 декабря норот ринется делать пиковые закупы вечером и и утром все это заклинит, а разгребать будет вторая линия поддержки из пару человек.

          Что значит "загнали систему в пик производительности" ? В коде из статьи быстродействие НЕ упирается в железо если что. Железо выдерживает в 5,5 раз больше.

          И не надо писать, что надо заранее железо увеличить, не поможет, надо будет шардировать, отчего вся опитимизация пойдет погулять

          Чему именно помешает шардирование?

          Зачем мне обновлять резерв прямо сразу, если у меня может быть страховой остаток и мне надо только не давать за некоторое время превысить его.

          Что делать если вы превысили страховой остаток? Да и в целом сам маркетплейс остатком не управляет, продавец отправляет на склад по мере готовности и спрос может превысить запасы.

          Значит заказ отменится. Цель - что бы такое было гораздо реже чем покупка.

          Если вы заказываете что-то в магазине, а ваш заказ отменяется, то будете ли вы еще что-то в нем заказывать?

          товар может не приехать по такому количеству причин, что "запасы на складе по базе" это такая мизерная часть, что возиться с кровью из глаз что бы вот точно-точно совпало с цыферкой на складе - это наивность

          Если вы всю обработку делаете в фоне, то количество таких причин конечно увеличится.

          Защитой является тот самый страховой остаток, поэтому все эти приседания - бессмыслены и даже вредны.

          Проблема в том, что в пики распродаж от момента поступления заказа до момента накладывания резерва могут заказать количество товаров в 10 раз превышающее страховой остаток. Вы же заранее не знаете как долго у вас будет очередь обрабатываться при нагрузке и как часто будут приходить заказы.

          И еще раз напомню, что в случае маркетплейса сам маркетплейс пополняет остатки на складе, это делает только продавец. Поэтому у вас может просто не быть страхового остатка.


        1. oracle_schwerpunkte
          13.10.2025 09:38

          Вы видно в торговле работали примерно никогда, товар может не приехать по такому количеству причин, что "запасы на складе по базе" это такая мизерная часть, что возиться с кровью из глаз что бы вот точно-точно совпало с цыферкой на складе - это наивность.

          Но вот конкретно резервирование можно было бы и нормально сделать, без отговорок.


        1. onets
          13.10.2025 09:38

          Много информации и предположений, хотелось бы уточнить некоторые моменты

          1. Сервис из статьи как раз может стоять после очереди и принимать не заказы, а список ItemId и Qty. И до этого может быть некая пред-обработка (упаковок, китов и так далее). Это же просто облегченный пример, чтобы показать, что обычное железо с простым кодом может сделать 600 заказов в секунду.

          2. Если 600 заказов в секунду на один склад и система справляется - то зачем шардирование? Но все же хорошо бы узнать - сколько там на самом деле у озона, возможно у них больше 600 и их усложненное решение оправдано

          3. В любом случае бутылочным горлышком будет не программный код или БД, а люди, которые бегают по складу и собирают заказ с полок. Сколько надо людских и иных ресурсов и складских площадей, чтобы физически обработать 600 заказов в секунду (это порядка 50 млн за 24 часа) на одном складе?

          4. Со страховым остатком разумеется дельная штука - можете чуть детальней рассказать, как вы его будете пересчитывать, чтобы он не пошел в разнос при параллельной обработке? Есть же базовый пример для общепринятого языка программирования - счетчик с interlock или volatile. Тут придется сделать примерно то же самое. И какой будет механизм отката, когда число выйдет за обозначенные рамки?

          5. "Купим пожиже и за ночь разгребем" - в некоторых случаях может быть определен SLA, типа обработать 200 тыщ заказов за 2-3 часа, а не ночью

          6. И еще момент - конкретно в случае с маркет-плейсом, когда резерв товара происходит до момента оплаты и откат резерва, если оплата не прошла. С точки зрения пользователя и клиентской части - это синхронна обработка. Если обрабатывать через очередь, то надо будет усложнять решение, чтобы дружить синхронного клиента и асинхронную очередь. Поэтому альтернатива с синхронным резервом без очередей - вполне себе альтернатива.


    1. GidraVydra
      13.10.2025 09:38

      Если мы точно знаем, что у нас на складах в разы больше товаров чем дневной заказ...

      ...то вы явно не понимаете бизнес-логику маркетплейса.


  1. onets
    13.10.2025 09:38

    Теперь интересно, что на это скажет озон. Они кстати не показали в статье результаты нагрузочный тестов.


    1. onets
      13.10.2025 09:38

      Нашел неподтвержденную инфу в интернетах

      Крупный логистический центр Ozon может обрабатывать более 900 тысяч заказов в день, а распределительный центр Ozon обрабатывает до 600 тысяч заказов в сутки

      Это скорее всего физическая отгрузка. Но для программной системы - это порядка 10 заказов в секунду.


      1. GidraVydra
        13.10.2025 09:38

        Это физические ограничения склада. Озоновская система может только у одного пользователя сформировать несколько сотен заказов за клик, а реальная пиковая производительность их АПК скорее всего на порядок-другой выше.


        1. beskaravaev
          13.10.2025 09:38

          Точных данных по rps я вам не скажу, но направлю на подумать.

          1) Грубо прикидывать rps исходя из кол-ва заказов в день - неправильно, т.к. нагрузка в течении дня неравномернавя

          2) Если перечитать статью, то можно заметить, что с обычной нагрузкой проблем не было. Проблемы были при старте продаж хамеров (товаров по супер низкой цене). Это когда условно 10к товаров сметают за пару секунд. 1 товар в одни руки. При этом далеко не у всех резерв завершается успешно как у вас.

          Теперь вопрос, что будет с вашей системой, когда в неё прилетит под 10к RPS?


  1. alexfilus
    13.10.2025 09:38

    А партиционирование по складам тут не поможет?


    1. onets
      13.10.2025 09:38

      В реальном мире все сложнее, клиент в Москве, а два склада в Москве и Новосибирске - насколько будет дороже и дольше доставка?

      Поэтому хорошо бы сначала выбрать оптимальный склад.


    1. gandjustas Автор
      13.10.2025 09:38

      Вы наверное не до конца поняли проблему.

      Чтобы сделать резервацию надо :

      1. Найти строку в бд

      2. Дождаться снятия блокировки обновления

      3. Повесить свою блокировку обновления

      4. Записать новую строку

      И повторить это от 1 до 10 раз (сколько строк в заказе). И только в самом конце снимаются блокировки.

      То есть следующая транзакция скорее всего зависнет на строке 2 на относительно долгое время. И транзакция за ней тоже будет висеть.

      В тесте, когда выпадали дедлоки, я видел циклы ожиданий, которые состояли из 9 транзакций.

      Блокировка происходит не на диапазонах, не на очереди записи на диск, а просто на ожидании снятия блокировки ОДНОЙ строки в один момент времени. Ни партицирование, ни шардирование тут не поможет.

      Такова цена ACID гарантий при высокой конкурентности. И это самый лучший вариант на самом деле, все альтернативы еще более медленные, как в примере с однопоточной очередью.


      1. alexfilus
        13.10.2025 09:38

        В случае блокировок на 1 строке тут действительно вряд ли можно что-то придумать. Но такое партиционирование немного снизило бы накладные расходы на поиск и управление блокировками.


  1. jakobz
    13.10.2025 09:38

    Вместо самодельных очередей, надо вешать rate limiter на входе. Он поставит http-запросы в очередь, и пустит в параллель сколько скажешь.

    Тогда, если разные куски когда будут бороться за коннекты, не получится что у тебя куча запросов ждет освободившегося коннекта чтобы завершится, а новые все прилетают - и тоже получают шанс сожрать коннект.


    1. gandjustas Автор
      13.10.2025 09:38

      Rate Limiter же обычно 429  возвращает, а не ждет выполнения. Непонятно насколько это лучше, чем клиент просто подождет


      1. onets
        13.10.2025 09:38

        А походу особо и не заморачиваются, как-то был случай с покупкой видеокарты на DNS - все просто подвисло, а потом когда отвисло, они кончились.


        1. jakobz
          13.10.2025 09:38

          Когда заморачиваются - картинку на cdn включают, типа «вас слишком многа», чтобы часть трафика убрать, и хоть для кого-то работать. Когда все подвисает - это как раз когда не подумали, и дали завалить всё и всем.


      1. jakobz
        13.10.2025 09:38

        Там есть очередь же. Настраиваешь чтобы у тебя запросы не запускались в обработку больше, скажем, 10 одновременно. Остальные ждут в очереди сколько скажешь. И дальше крутишь ручки на перф-тесте. Обычно надо чтобы уперлось в CPU базы. Если под недонагружен по cpu, а база еще дышит - можно побольше в параллель навалить.

        База, от того что ей в параллель надо кучу запросов вертеть, быстрее не работает - у нее тоже не бесконечное количество процов. Ей быстрее последовательно все делать плюс-минус по очереди.

        Ну и исключается гонка за коннектами, сотни запросов ждущих блокировки и коннекты к БД. Все что система не переварит - видно по растущей очереди.


  1. perfectdaemon
    13.10.2025 09:38

    В оригинальной статье рассматривалась проблема высокой конкурентности за одну строку (один и тот же товар — условную сковородку, которую хотят купить все) и RPS описан как «тысячи» (но точного числа нет), а у вас вышло 700 RPS на разные товары.

    Не умаляю ваши заслуги и приведенный код, но кажется, что ответ выше неполный :)


    1. gandjustas Автор
      13.10.2025 09:38

      Во-первых в статье написать можно любую цифру

      Во-вторых у меня код выдает 600 заказов в секунду и 300+ обновлений остатков в секунду на каждый товар. Я могу сделать по 100 товаров и 100 складов, распределив равномерно запросы между нему, тогда количество запросов в секунду будет около 2000, а количество броней на один товар в секунду - меньше 100.

      В-третьих у меня код работает на одной машине, в docker в wsl вместе со средой разработки и тестирования. Мы же не знаем какое железо было в оригинальной статье.


  1. Dhwtj
    13.10.2025 09:38

    ALTER TABLE stock ADD CONSTRAINT check_reserved CHECK (reserved <= quantity);

    То есть перенесем один из инвариантов в СУБД

    Убирает необходимость ручной проверки через AnyAsync(...)

    Ещё можно

    var stockRows = await ctx.Stock
        .FromSqlRaw(@"
            SELECT * FROM stock 
            WHERE (item_id, warehouse_id) IN ({0})
            ORDER BY item_id, warehouse_id
            FOR UPDATE",
            string.Join(",", keys.Select(k => $"({k.ItemId},{k.WarehouseId})")))
        .ToListAsync(ct);

    Полностью устраняет дедлоки, но это уже вышел за пределы ORM


    1. gandjustas Автор
      13.10.2025 09:38

      А вы статью прочитали?


    1. onets
      13.10.2025 09:38

      Вот тут поясню (у меня такая же задача, только у меня тут не озон конечно).

      Конфигурация - MySql 8.0 / i7-8700k / 32Gb, на одном складе, 10 items, 100 потоков, 1000 requests

      • Вариант, когда я беру из БД с SELECT FOR UPDATE, обновляю в C# коде и потом сохраняю в БД - в логах консольки деадлоки все равно есть, retry в EF Core срабатывает, цифры в конце сходятся, скорость примерно 20 rps

      • Вариант с хранимкой с SELECT FOR UPDATE, в логах консольки деадлоки есть, retry в EF Core срабатывает, цифры в конце сходятся, скорость примерно 60-90 rps

      • Вариант с хранимкой БЕЗ SELECT FOR UPDATE + без проверки остатка в конце, в логах консольки пусто, цифры в конце сходятся, скорость 400-440 rps

      • Вариант с хранимкой БЕЗ SELECT FOR UPDATE + с проверкой остатка в конце, в логах консольки пусто, цифры в конце сходятся, скорость 300 rps

      И что еще заметил - последние два варианта отрабатывают нормально даже на уровне ReadUncommitted. Меня это смущает, пока не понимаю почему.

      Позже пожалуй поставлю postgresql и проверю тоже самое на нем, интересно узнать разницу между дефолтным mysql и дефолтным postgresql