Я прочитал статью, и меня поразило, сколько сомнительных решений можно использовать для одной простой задачи.
В этой статье я расскажу, как правильно создать сервис для конкурентных обновлений остатков данных в базе данных. Буду использовать .NET, C#, Entity Framework и Postgres.
Условия
В базе данных хранятся остаток товаров и количество зарезервированных товаров по каждой паре (товар, склад)
Инвариант системы - для всех пар (товар, склад) количество зарезервированных товаров должно быть не больше остатка
-
Должен быть метод размещения заказа (резерва), который:
Получает
Idзаказа и массив строк заказа (Id товара,Id склада,количество)Данные о заказе и позициях заказа сохраняет в базе данных, обновляет резервы
Для каждой позиции заказа обновляет количество зарезервированных товаров = количество зарезервированных товаров + количество в заказе
Работает транзакционно - выполняется полностью или не выполняется вовсе, не нарушает инварианты, не влияет на другие операции и другие операции не влияют на результат, если вернул положительный ответ, то данные уже не потеряются (ACID)
Система не должна падать с ошибками под нагрузкой
Каркас приложения
Для создания приложения выполняю команды
dotnet new webapi
dotnet add package Microsoft.EntityFrameworkCore
dotnet add package Microsoft.EntityFrameworkCore.Design
Я буду использовать Entity Framework, так как хочу создавать схему в коде и использовать миграции для обновления.
Модель
[PrimaryKey(nameof(ItemId), nameof(WarehouseId))]
public class Stock
{
public int ItemId { get; set; }
public int WarehouseId { get; set; }
public int Quantity { get; set; }
public int Reserved { get; set; }
}
[JsonObjectCreationHandling(JsonObjectCreationHandling.Populate)]
public class Order
{
[Key]
public Guid Id { get; set; }
public List<OrderLine> Lines { get; } = new List<OrderLine>();
}
[PrimaryKey(nameof(OrderId), nameof(ItemId), nameof(WarehouseId))]
public class OrderLine
{
[JsonIgnore]
public Guid OrderId { get; set; }
public int ItemId { get; set; }
public int WarehouseId { get; set; }
public int Quantity { get; set; }
}
По умолчанию применяем минимум четвертую нормальную форму, чтобы не требовалось массовых операций при добавлении\изменении\удалении одного элемента модели.
Первичные ключи гарантируют уникальность, то есть нельзя будет дважды вставить один заказ или сделать две строки заказа с одним и тем же товаром и складом. Кроме того все базы данных при назначении ключей создают индексы по ключевым полям, что может ускорить поиск.
Класс контекста
public class StockApiDataContext(DbContextOptions<StockApiDataContext> options) : DbContext(options)
{
public DbSet<Order> Orders { get; set; } = null!;
public DbSet<OrderLine> OrderLines { get; set; } = null!;
public DbSet<Stock> Stock { get; set; } = null!;
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
var tableBuilder = modelBuilder.Entity<Stock>();
tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
}
}
Значение по умолчанию для поля Reserved нужно только для более удобного наполнения тестовыми данными.
База данных
В качестве сервера баз данных я буду использовать docker-контейнер, запущенный локально
docker run -d \
--name postgres \
-p 5432:5432 \
-e POSTGRES_PASSWORD=P@ssw0rd \
postgres
В проект добавлю пакеты для работы с Postgres
dotnet add package Npgsql.EntityFrameworkCore.PostgreSQL
dotnet add package EFCore.NamingConventions
Последний пакет дает возможность изменить соглашение именования объектов в базе данных с PascalCase на snake_case, что позволяет писать запросы к postgres без экранирования имен.
Для создания миграций установлю инструменты
dotnet tool install --global dotnet-ef
Код приложения для подключения к базе данных и применения миграций
var builder = WebApplication.CreateBuilder(args);
// ...
builder.Services.AddDbContext<StockApiDataContext>(opt =>
opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"))
.UseSnakeCaseNamingConvention()
);
// ...
var app = builder.Build();
//...
using (var scope = app.Services.CreateScope())
{
var db = scope.ServiceProvider.GetRequiredService<StockApiDataContext>();
await db.Database.MigrateAsync();
}
Теперь можно создать миграции
dotnet ef migrations add Initial
Код сервиса
app.MapPost("/place-order",
async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
})
Данные сущности Order заполняются с помощью привязки значений, при этом поле
Order.Linesне имеет сеттера, но указав атрибутJsonObjectCreationHandling(JsonObjectCreationHandling.Populate), можно заполнять значениями существующие объекты, а не создавать новые.В запросе может прийти множество строк заказа с одинаковыми
ItemIdиWarehouseId, они склеиваются в одну, суммируя количество.
Код только сохраняет заказы в базу, не обновляет и не проверяет остатки. Используем его для получения бейзлайна по быстродействию.
Тест быстродействия
Использую k6 от графаны
winget install k6 --source winget
Код теста
import http from 'k6/http';
function rand(min, max) {
return Math.round(Math.random() * (max - min) + min);
}
const itemsCount = 10;
const warehousesCount = 10;
export const options = {
iterations: itemsCount * warehousesCount * 300,
vus: 50,
};
export default function () {
const url = __ENV.applicationUrl ?? 'http://localhost:5011';
const params = {
headers: {
'Content-Type': 'application/json',
},
};
const count = rand(1, itemsCount);
let lines = [];
for (let index = 0; index < count; index++) {
lines.push({
itemId: rand(1, itemsCount),
warehouseId: rand(1, warehousesCount),
quantity: rand(1, 5)
})
}
const payload = JSON.stringify({ id: crypto.randomUUID(), lines: lines });
http.post(`${url}/place-order`, payload, params);
}
Тест генерирует запрос в котором от 1 до 10 позиций, в которых случайные id для товаров и складов в диапазоне от 1 до 10 и от 1 до 5 единиц товара в каждой позиции.
Тест выполняется параллельно в 50 потоков
Общее количество итераций = 300 х 10 х 10 = 30 000
Так как количество строк заказа будет варьироваться между тестами, то и время забега будет немного отличаться, поэтому я буду делать несколько замеров и выкладывать средний.
Нагрузочный тест
Запуск приложения
dotnet run -c Release -- Logging:LogLevel:Microsoft.EntityFrameworkCore.Database.Command="Error"
По умолчанию ef core все запросы логирует в консоль, чтобы консоль не искажала результаты замера быстродействие я оставил вывод в консоль только ошибок.
Перед каждым забегом я сбрасываю состояние базы данных sql-скриптом
TRUNCATE TABLE orders CASCADE;
TRUNCATE TABLE stock;
INSERT INTO stock
select *, random(10000, 100000),0 as quantity from generate_series(1,10) item_id, generate_series(1,10) warehouse_id;
Запуск теста (в другом терминале)
k6 run k6test.js
Результат
HTTP
http_req_duration..............: avg=16.03ms min=4.85ms med=14.86ms max=385.25ms p(90)=20.7ms p(95)=23.46ms
{ expected_response:true }...: avg=16.03ms min=4.85ms med=14.86ms max=385.25ms p(90)=20.7ms p(95)=23.46ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 3074.002522/s
Это бейзлайн, относительно которого я буду оценивать быстродействие.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/initial
Обновление остатков
Добавлю логику обновления остатков на складах, пока без проверки
app.MapPost("/place-order/",
async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
await using var t = await ctx.Database.BeginTransactionAsync(ct);
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
foreach (var l in order.Lines)
{
await ctx.Stock
.Where(s => s.ItemId == l.ItemId
&& s.WarehouseId == l.WarehouseId)
.ExecuteUpdateAsync(setter =>
setter.SetProperty(s => s.Reserved,
s => s.Reserved + l.Quantity),
ct);
}
await t.CommitAsync(ct);
})
И сразу же получаем взаимоблокировки (deadlock) при нагрузочном тесте и сыпятся ошибки в консоли.
Почему происходит deadlock
База данных имеет по умолчанию уровень изоляции read committed, то есть не может прочитать данные, которые были изменены, но еще не закоммичены другой транзакцией.
Для этого Postgres (и другие СУБД) накладывает блокировку на запись строки и снимает её только в конце транзакции.
-
Когда параллельных транзакций больше одной и они меняют больше одной строки, то может случиться так:
Транзакция А успела заблокировать строку 1
Транзакция Б успела заблокировать строку 2
Транзакция А повисла в ожидании снятия блокировки строки 2
Транзакция Б повисла в ожидании снятия блокировки строки 1
Ситуация, описанная выше, называется взаимоблокировка (deadlock). Postgres (и другие СУБД) определяет, что появилась такая блокировка и отменяет одну из заблокированных транзакций, чтобы другие могли выполняться.
Как бороться со взаимоблокировками
Простое правило:
Все транзакции должны накладывать блокировки в одном и том же порядке
Для этого просто отсортируем позиции по Id товара и склада
// ...
foreach (var l in order.Lines.OrderBy(l => l.ItemId).ThenBy(l => l.WarehouseId))
// ...
Результаты забега
HTTP
http_req_duration..............: avg=92.86ms min=4.45ms med=23.4ms max=6.03s p(90)=186.04ms p(95)=390.5ms
{ expected_response:true }...: avg=92.86ms min=4.45ms med=23.4ms max=6.03s p(90)=186.04ms p(95)=390.5ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 536.144972/s
Из-за ожиданий блокировок некоторые транзакции висят по несколько секунд. Пропускная способность упала почти в 6 раз.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-update-loop
Обновление остатков одним запросом
Предыдущий вариант плох тем, что мы очень много раз бегаем в базу, отправляя отдельные запросы на обновление остатков, хотя саму логику обновления мы можем сделать одним запросом
app.MapPost("/place-order/",
async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
await using var t = await ctx.Database.BeginTransactionAsync(ct);
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
var q = from l in ctx.OrderLines
where l.OrderId == order.Id
join s in ctx.Stock
on new { l.ItemId, l.WarehouseId }
equals new { s.ItemId, s.WarehouseId }
select new { s, l };
await q.ExecuteUpdateAsync(setter =>
setter.SetProperty(x => x.s.Reserved,
x => x.s.Reserved + x.l.Quantity),
ct);
await t.CommitAsync(ct);
})
К сожалению такой код будет вызывать deadlock, так как Postgres не гарантирует в каком порядке будут накладываться блокировки на строки. Это зависит от порядка записей на диске, а порядок записей на диске зависит от предыдущих операций обновления.
Результаты забега при появлении deadlock
HTTP
http_req_duration..............: avg=87.22ms min=4.28ms med=19.75ms max=13.26s p(90)=160.89ms p(95)=326.21ms
{ expected_response:true }...: avg=86.34ms min=4.28ms med=19.74ms max=13.26s p(90)=160.48ms p(95)=321.98ms
http_req_failed................: 0.05% 15 out of 30000
http_reqs......................: 30000 571.569136/s
Результаты забега когда взаимоблокировки не возникают
HTTP
http_req_duration..............: avg=69.83ms min=4.15ms med=19.94ms max=5.32s p(90)=155.74ms p(95)=280.21ms
{ expected_response:true }...: avg=69.83ms min=4.15ms med=19.94ms max=5.32s p(90)=155.74ms p(95)=280.21ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 713.321511/s
Это значительно быстрее, чем обновление остатков с цикле, но нужно как-то бороться с deadlock_ами.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-check-query
Что делать с deadlock
Если вы не можете обеспечить одинаковый порядок блокирования ресурсов, то в общем случае есть две стратегии: выбор более строгого режима изоляции транзакций и увеличение гранулярности блокировок.
Но в Postgres увеличение уровня изоляции до repeatable read или serializable приведет к тому, что ошибок станет еще больше, так как postgres откатывает транзакции при нарушении изоляции.
Увеличение гранулярности может помочь, но:
Это приведет к уменьшению параллельности запросов, вплоть до полостью последовательного выполнения, что снизит пропускную способность
Потребует руками писать SQL-код, так как у EF Core нет методов для ручного управления блокировками
Поэтому лучше смириться с тем, что часть запросов будут отваливаться и просто повторять их. Тем более в EF Core встроен механизм отказоустойчивости, он автоматически повторяет запросы, если считает что ошибка некритична.
Такой механизм полезен не только для Postgres, но и других баз данных, так как они тоже могут выдавать ошибки взаимоблокировок и другие временные (transient) ошибки.
Повторение запросов
Для повторения запросов надо это повторение включить
// ...
builder.Services.AddDbContext<StockApiDataContext>(opt =>
opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"),
npgOptions => npgOptions.EnableRetryOnFailure()) // включить повторение запросов
.UseSnakeCaseNamingConvention()
);
// ...
А также нужно использовать "стратегию" в коде, чтобы группа запросов повторялась в одной транзакции
app.MapPost("/place-order/",
async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId // добавлено
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
var db = ctx.Database;
await db.CreateExecutionStrategy().ExecuteInTransactionAsync(async ct =>
{
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
var q = from l in ctx.OrderLines
where l.OrderId == order.Id
join s in ctx.Stock
on new { l.ItemId, l.WarehouseId }
equals new { s.ItemId, s.WarehouseId }
select new { s, l };
await q.ExecuteUpdateAsync(setter =>
setter.SetProperty(x => x.s.Reserved,
x => x.s.Reserved + x.l.Quantity),
ct);
}, ct => Task.FromResult(false), ct);
})
Также добавил сортировку строк по ItemId и WarehouseId перед добавлением в базу данных, чтобы сократить количество взаимоблокировок.
Результаты забега
HTTP
http_req_duration..............: avg=74.72ms min=4.56ms med=19.61ms max=6.32s p(90)=146.08ms p(95)=284.93ms
{ expected_response:true }...: avg=74.72ms min=4.56ms med=19.61ms max=6.32s p(90)=146.08ms p(95)=284.93ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 666.887768/s
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-update-query-retry
Проверка остатков
Для этого достаточно добавить одну строку в код выше
await q.ExecuteUpdateAsync(/* прощено */);
// Проверка остатков
if (await q.AnyAsync(x =>
x.s.Quantity < x.s.Reserved,
ct)) throw new Exception("Oversell");
Так как предыдущий код, обновляющий остатки, уже навешивает блокировки на все строки, то проверка этих же строк будет изолирована от других транзакций.
Если инвариант будет нарушен, но EF Core не будет повторять запрос, так как такая ошибка не считается "временной".
Результаты забега
HTTP
http_req_duration..............: avg=79.47ms min=4.68ms med=21.88ms max=4.36s p(90)=168.33ms p(95)=320.29ms
{ expected_response:true }...: avg=79.47ms min=4.68ms med=21.88ms max=4.36s p(90)=168.33ms p(95)=320.29ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 627.168904/s
Нагрузочный тест делается в условиях когда запасы заведомо превышают все резервы заказов, так как ошибки сильно влияют на время выполнения запросов.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-check-query
Итого
Вся необходимая логика реализована, без ручных блокировок и на уровне изоляции read committed
В каждом запросе среднем 5 позиций заказа, получаем (627.168904*5)=3 135.84452 обновлений остатков в секунду
В тесте всего 10 разных товаров, на один товар приходится (3 135.84452/10)=313.584452, округленно 314 обновлений остатков в секунду
Это на домашнем компе, с Postgres в docker в wsl
Во время теста показатели скорости записи на диск не превышали 5 мб\сек
Мне кажется что 600 заказов в секунду и 300 резервов на один товар это вполне достаточная производительность для любого маркетплейса.
Логика в базе данных
По житейской логике заказ всегда должен создаваться вместе с резервом товаров по этому заказу. Поэтому нет необходимости каждый раз из приложения отправлять одни и те же запросы на обновление и контроль остатков. В базе данных есть средства делать это автоматически.
На этом месте общественность может возмутиться: "как же так, бизнес-логика в базе данных это плохо". Но плохо это, если бизнес-логика в базе данных обновляется и проверяется независимо от приложения. Тогда очень легко допустить несогласованные изменения.
К счастью для EF есть расширение, которое позволяет делать триггеры в базе данных на C#
dotnet add package Laraue.EfCoreTriggers.PostgreSql
Нужно добавить расширение к контексту
builder.Services.AddDbContext<StockApiDataContext>(opt =>
opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"),
npgOptions => npgOptions.EnableRetryOnFailure())
.UseSnakeCaseNamingConvention()
.UsePostgreSqlTriggers() // добавлено
);
И добавить необходимые триггеры и проверки в модель
// Изменения в классе StockApiDataContext, остальной код не меняется
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
var tableBuilder = modelBuilder.Entity<Stock>();
var reservedProp = tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
var quantityProp = tableBuilder.Property(s => s.Quantity);
tableBuilder.ToTable(t =>
t.HasCheckConstraint("check_stock", $"{quantityProp.Metadata.GetColumnName()} >= {reservedProp.Metadata.GetColumnName()}"));
modelBuilder.Entity<OrderLine>()
.AfterInsert(t =>
t.Action(a =>
a.Update<Stock>(
(l, s) => s.ItemId == l.New.ItemId
&& s.WarehouseId == l.New.WarehouseId,
(l, s) => new Stock {
Reserved = s.Reserved + l.New.Quantity }
)
)
);
}
Такой код создает триггер, который на каждое добавление OrderLine вызывает обновление таблицы Stock. Контроль остатков выполняется обычным check constraint.
Код метода при этом упрощается
app.MapPost("/place-order/", async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
})
Результаты забега
HTTP
http_req_duration..............: avg=74.77ms min=2.66ms med=17.56ms max=5.57s p(90)=139.43ms p(95)=291.77ms
{ expected_response:true }...: avg=74.77ms min=2.66ms med=17.56ms max=5.57s p(90)=139.43ms p(95)=291.77ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 666.194313/s
Я подошел к границе Парето-эффективности
Выигрыш по сравнению с предыдущим вариантом 7-8%
Обновление логики триггеров теперь требует миграций, даже во время написания кода я потратил заметно больше времени, чем в варианте с запросами
Код все еще универсальный, для смены провайдера надо будет поменять только пакеты регистрацию в контейнере
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/trigger-and-constraint
Увеличиваем нагрузку
В k6 тесте поменяю количество виртуальных пользователей с 50 до 100
export const options = {
iterations: itemsCount * warehousesCount * 300,
vus: 100, // было 50
};
В результате получаю
HTTP
http_req_duration..............: avg=165.63ms min=3.03ms med=27.67ms max=57.48s p(90)=197.06ms p(95)=453.54ms
{ expected_response:true }...: avg=165.63ms min=3.03ms med=27.67ms max=57.48s p(90)=197.06ms p(95)=453.54ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 489.284481/s
В логах Postgres появляется много сообщений `FATAL: sorry, too many clients already`. Повторение запросов Entity Framework помогает и тут, повторяя запросы пока они не выполнятся. Но если увеличить количество vus до 150, то запросы начинают отваливаться по ошибке. По-умолчанию механизм повторений EF Core пытается повторить запрос 5 раз.
В Postgres по умолчанию max_connections=100. Это число можно увеличить, но с ростом количества соединений увеличивается и потребление памяти. Теоретически верхняя граница количества подключений равна максимальному количеству открытых портов, что порядка 65 тысяч. Однако, если доступный объем памяти будет исчерпан, операционная система завершит работу Postgres. Таким образом, всегда существует предел, который может быть превышен при высокой нагрузке.
Для Microsoft SQL Server такой проблемы нет. Для Postgres можно настроить Maximum Pool Size, чтобы подключений в пуле было меньше, чем свободных подключений в postgres. Тогда попытка получить еще одно подключение зависнет в ожидании на Timeout, который по умолчанию 15 сек.
При Maximum Pool Size=80 и vus: 100 результаты такие
HTTP
http_req_duration..............: avg=223.56ms min=32.59ms med=127.14ms max=13.24s p(90)=266.08ms p(95)=470.36ms
{ expected_response:true }...: avg=223.56ms min=32.59ms med=127.14ms max=13.24s p(90)=266.08ms p(95)=470.36ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 666.724807/s
Поэтому далее идут оптимизации, которые вы почти никогда не будете применять на практике.
Очередь запросов к базе данных
Первое правило создания очередей: не создавай очередей (с) Джейсон Стэйтем
Очередь чтения и записи на диск, блокировки в базе данных, механизм повторов, пул соединений, ThreadPool в .net - это все уже очереди, возможно стоит настроить их параметры и не заниматься созданием своих очередей. Поищите альтернативные решения, прежде чем изобретать свою очередь.
Для начала напишу такой код, как хочу получить
app.MapPost("/place-order/",
async (Order order,
DbQueueService<StockApiDataContext> worker, // вместо StockApiDataContext ctx
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
await worker.ExecuteAsync(async (ctx, ct) => // добавлено
{
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
}, ct);
})
Чтобы такой код заработал мне необходимо зарегистрировать класс в контейнере
builder.Services.Configure<DbQueueServiceOptions>(o => { });
builder.Services.AddSingleton<DbQueueService<StockApiDataContext>>();
// регистрация синглтона в качестве Hosted Service
builder.Services.AddHostedService(sp =>
sp.GetRequiredService<DbQueueService<StockApiDataContext>>());
Сам класс очереди на базе BackgroundService
internal class DbQueueService<TContext>(
IOptions<DbQueueServiceOptions> options,
IServiceProvider sp) : BackgroundService where TContext : DbContext
{
private readonly record struct QueueItem(
TaskCompletionSource Source,
Func<TContext, CancellationToken, Task> Action,
CancellationToken CancellationToken);
private Channel<QueueItem> channel = Channel.CreateUnbounded<QueueItem>(new() { SingleReader = true });
public async Task ExecuteAsync(
Func<TContext, CancellationToken, Task> action,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(action);
TaskCompletionSource tcs = new();
await channel.Writer.WriteAsync(new(tcs, action, ct), ct);
await tcs.Task;
}
// From BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var reader = channel.Reader;
while (await reader.WaitToReadAsync(stoppingToken))
{
List<QueueItem> batch = new();
while (batch.Count < options.Value.MaxItemsInBatch
&& reader.TryRead(out var item)) batch.Add(item);
await ProcessBatch(sp, batch, stoppingToken);
}
}
private async Task ProcessBatch(
IServiceProvider sp,
IEnumerable<QueueItem> batch,
CancellationToken stoppingToken)
{
await using var scope = sp.CreateAsyncScope();
await using var ctx = scope.ServiceProvider.GetRequiredService<TContext>();
foreach (var item in batch)
{
if (stoppingToken.IsCancellationRequested)
{
item.Source.SetCanceled(stoppingToken);
continue;
}
var result = item.Action(ctx, item.CancellationToken);
await result.WaitAsync(item.CancellationToken)
.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
item.Source.SetFromTask(result);
}
}
}
internal class DbQueueServiceOptions
{
public int MaxItemsInBatch { get; set; } = 50;
}
Для организации передачи от вызывающего кода к обработчику очереди используется System.Threading.Channels.Channel, в принципе нет ни одного разумного аргумента не использовать эти классы для любых очередей
В качестве элемента очереди используется
record struct, чтобы сократить количество аллокацийА для передачи результата вызывающему коду используется TaskCompletionSource
public ExecuteAsyncотправляет элемент в канал и возвращает Taskprotected override ExecuteAsyncждет когда в канале будет хотя бы один элемент, получает из канала все что есть и отправляет весь батч на обработку.ProcessBatchполучает батч, создает контекст EF и в цикле выполняет функцииВсе выполняется в одном потоке, без конкурентности запросов
Результаты забега (для vus: 100)
HTTP
http_req_duration..............: avg=493.12ms min=8.26ms med=496.76ms max=676.42ms p(90)=510.17ms p(95)=514.03ms
{ expected_response:true }...: avg=493.12ms min=8.26ms med=496.76ms max=676.42ms p(90)=510.17ms p(95)=514.03ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 202.400329/s
Пропускная способность не зависит от количества виртуальных пользователей, но время обработки растет. Запросы приходят быстрее, чем удается их обрабатывать в один поток.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/write-queue
Параллельная обработка батчей
Предыдущий вариант оказался очень медленным из-за обработки в одном потоке. Чтобы сделать параллельную обработку нескольких батчей достаточно поменять один метод класса очереди
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
List<Task> tasks = new();
var reader = channel.Reader;
while (await reader.WaitToReadAsync(stoppingToken))
{
List<QueueItem> batch = new();
if (tasks.Count >= options.Value.MaxConcurrentBatches)
await Task.WhenAny(tasks.ToArray());
while (batch.Count < options.Value.MaxItemsInBatch
&& reader.TryRead(out var item)) batch.Add(item);
tasks.RemoveAll(t => t.IsCompleted);
tasks.Add(ProcessBatch(sp, batch, stoppingToken));
}
await Task.WhenAny(tasks.ToArray(), stoppingToken);
}
Теперь код не ожидает завершения обработки батча, а записывает задачу в List и продолжает цикл. И только если в List уже достаточно много задач, то мы дожидается завершения любой из них, а уже потом запускает новую.
Будет запускаться параллельно столько батчей, сколько указано в опции MaxConcurrentBatches.
В рамках теста я вычислил оптимальный параметр по умолчанию для моей конфигурации
internal class DbQueueServiceOptions
{
public int MaxItemsInBatch { get; set; } = 50;
public int MaxConcurrentBatches { get; set; } = 15;
}
Результаты забега (для vus: 100)
HTTP
http_req_duration..............: avg=136.85ms min=3.42ms med=98.93ms max=924.03ms p(90)=303.1ms p(95)=382.81ms
{ expected_response:true }...: avg=136.85ms min=3.42ms med=98.93ms max=924.03ms p(90)=303.1ms p(95)=382.81ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 728.288949/s
Это еще на 9% лучше, чем результат без очереди. Кроме того снижает количество использованных соединений. В реальности, кроме запросов на создание резерва, будут еще запросы на получение остатков и они будут гораздо меньше упираться в количество соединений.
Но еще раз повторю, что в большинстве случаев вам это не нужно.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/write-queue-with-parallel
Оптимизация хранения
То что будет дальше вам никогда не понадобится в продуктивных приложениях, даже если вы работает в большом маркетплейсе. Пример только для демонстрации возможностей, особенностей работы и расширения кругозора.
Для дальнейшей оптимизации можно сохранять данные о строках заказа в самом заказе в виде массивов.
Модель будет выглядеть так
[PrimaryKey(nameof(ItemId), nameof(WarehouseId))]
public class Stock
{
public int ItemId { get; set; }
public int WarehouseId { get; set; }
public int Quantity { get; set; }
public int Reserved { get; set; }
}
public class Order
{
[Key]
public Guid Id { get; set; }
public List<int> ItemIds { get; set; } = [];
public List<int> WarehouseIds { get; set; } = [];
public List<int> Quantities { get; set; } = [];
}
А код сервиса так
app.MapPost("/place-order/", async (OrderModel order,
DbQueueService<StockApiDataContext> worker,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId
select new
{
g.Key.ItemId,
g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
Order dbOrder = new() { Id = order.Id };
foreach (var l in lines)
{
dbOrder.ItemIds.Add(l.ItemId);
dbOrder.WarehouseIds.Add(l.WarehouseId);
dbOrder.Quantities.Add(l.Quantity);
}
await worker.ExecuteAsync(async (ctx, ct) =>
{
ctx.Orders.Add(dbOrder);
await ctx.SaveChangesAsync(ct);
}, ct);
})
Для входных данных отдельная модель
public record OrderModel(Guid Id, ICollection<OrderLineModel> Lines);
public record OrderLineModel(int ItemId, int WarehouseId, int Quantity);
Самое важное - триггер, который должен обновить остатки. Так как невозможно такой триггер написать на C#, то придется сделать это на pgSQL
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
var tableBuilder = modelBuilder.Entity<Stock>();
var reservedProp = tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
var quantityProp = tableBuilder.Property(s => s.Quantity);
tableBuilder.ToTable(t =>
t.HasCheckConstraint("check_stock", $"{quantityProp.Metadata.GetColumnName()} >= {reservedProp.Metadata.GetColumnName()}"));
modelBuilder.Entity<Order>()
.AfterInsert(t =>
t.Action(a =>
a.ExecuteRawSql("""
UPDATE stock
SET reserved = reserved + l.q
FROM unnest({0},{1},{2}) AS l(i,w,q)
WHERE (item_id,warehouse_id) = (l.i,l.w);
""",
tableRef => tableRef.New.ItemIds,
tableRef => tableRef.New.WarehouseIds,
tableRef => tableRef.New.Quantities
)
)
);
}
Ключевое - функция unnest, которая преобразует массивы в строки.
Но такой код, к сожалению, теряет обновления остатков.
Если после теста выполнить запрос, который разворачивает все массивы из заказов и группирует по item_id и warehouse_id и сравним с количеством зарезервированных товаров в таблице stock, то такой запрос вернет много несовпадений
select s.*, t.r
from stock s
join (
select l.i, l.w, SUM(l.q) as r
from orders o,
lateral unnest(o.item_ids,o.warehouse_ids,o.quantities) as l(i,w,q)
group by 1,2) t on (s.item_id,s.warehouse_id) = (t.i,t.w)
where s.reserved <> t.r
Я не смог победить эту проблему даже ручными блокировками. С запросом ниже все равно появляются потерянные обновления.
WITH l AS (
SELECT s.ctid, l.q
FROM stock s
JOIN unnest({0},{1},{2}) AS l(i,w,q)
on (s.item_id,s.warehouse_id) = (l.i,l.w)
FOR NO KEY UPDATE
)
UPDATE stock s
SET reserved = reserved + l.q
FROM l
WHERE s.ctid = l.ctid;
Если установить MaxConcurrentBatches = 1, то проблема пропадает, но и скорость становится неприемлемо низкой.
Также проблему решает поднятие уровня изоляции до repeatable read, но это вызывает огромное количество конфликтов и скорость также падает.
У меня есть подозрение, что это какой-то баг Postgres, но у меня не хватает знаний ни подтвердить, ни опровергнуть эту гипотезу.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/store-lines-in-array
Обновление остатков в триггере в цикле
В одном из постов на StackOverflow (к сожалению не сохранил ссылку) увидел совет использовать цикл в триггере.
FOR x IN SELECT l.*
FROM UNNEST(NEW.item_ids,NEW.warehouse_ids,NEW.quantities) as l(i,w,q)
LOOP
UPDATE stock s
SET reserved = reserved + x.q
WHERE (s.item_id,s.warehouse_id) = (x.i,x.w);
END LOOP;
Такой триггер решает проблему без уменьшения параллелизма и без повышения уровня изоляции. Но этот триггер требует объявить переменную x в функции триггера, а используемое расширение для EF не дает такой возможности.
Но никто не мешает непосредственно в коде миграции исправить триггер
migrationBuilder.Sql("""
CREATE FUNCTION "LC_TRIGGER_AFTER_INSERT_ORDER"() RETURNS trigger as $LC_TRIGGER_AFTER_INSERT_ORDER$
DECLARE x RECORD;
BEGIN
FOR x IN SELECT l.*
FROM UNNEST(NEW.item_ids,NEW.warehouse_ids,NEW.quantities) as l(i,w,q)
LOOP
UPDATE stock s
SET reserved = reserved + x.q
WHERE (s.item_id,s.warehouse_id) = (x.i,x.w);
END LOOP;
RETURN NEW;
END;
$LC_TRIGGER_AFTER_INSERT_ORDER$ LANGUAGE plpgsql;
CREATE TRIGGER LC_TRIGGER_AFTER_INSERT_ORDER AFTER INSERT
ON "orders"
FOR EACH ROW EXECUTE PROCEDURE "LC_TRIGGER_AFTER_INSERT_ORDER"();
""");
Теперь все работает без конфликтов и аномалий
Результаты забега (для vus: 100)
HTTP
http_req_duration..............: avg=133.57ms min=2.87ms med=97.05ms max=949.55ms p(90)=296.95ms p(95)=378.7ms
{ expected_response:true }...: avg=133.57ms min=2.87ms med=97.05ms max=949.55ms p(90)=296.95ms p(95)=378.7ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 746.43081/s
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/trigger-with-loop
Идемпотентность
Все предыдущие версии кода будут возвращать ошибку если попытаться создать еще один заказ с тем же Id. Теперь я сделаю код идемпотентным, как в API сервиса, так и на уровне базы данных
app.MapPut("/place-order/{id}", async (Guid id, ICollection<OrderLineModel> lines,
DbQueueService<StockApiDataContext> worker,
CancellationToken ct) =>
{
var q = from l in lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId
select new
{
g.Key.ItemId,
g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
List<int> itemIds = [], warehouseIds = [], quantities = [];
foreach (var l in q)
{
itemIds.Add(l.ItemId);
warehouseIds.Add(l.WarehouseId);
quantities.Add(l.Quantity);
}
await worker.ExecuteAsync(async (ctx, ct) =>
{
var db = ctx.Database;
await db.CreateExecutionStrategy().ExecuteInTransactionAsync(async ct =>
{
await db.ExecuteSqlAsync($"""
INSERT INTO orders
VALUES({id},{itemIds},{warehouseIds},{quantities})
ON CONFLICT (id) DO NOTHING
""", ct);
}, ct => Task.FromResult(false), ct);
}, ct);
})
PUTвместоPOST, что сообщает потребителю о том, что метод идемпотентенВ url для
PUTнеобходимо указатьIdДля вставки строки в БД я использую запрос, который просто ничего не делает, если
Idзаказа в базе уже есть
Результаты забега (для vus: 100)
HTTP
http_req_duration..............: avg=132ms min=3.3ms med=94.79ms max=1.21s p(90)=292.3ms p(95)=378.37ms
{ expected_response:true }...: avg=132ms min=3.3ms med=94.79ms max=1.21s p(90)=292.3ms p(95)=378.37ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 755.053098/s
Пропускная способность на 13% выше варианта без очереди и postgres-специфичных оптимизаций, и всего на 3.7% выше варианта без оптимизаций.
Выводы
Задача обновления остатков решается на уровне изоляции read committed без блокировок высокой гранулярности.
Закон Парето для оптимизации кода выполняется - 80% достижимого быстродействия достигается при 20% усилий, оставшиеся 20% результата требуют 80% усилий. Главное знать куда прикладывать усилия.
Дедлоки и прочие временные ошибки не всегда являются проблемой, иногда они являются частью нормальной работы. Просто включите повторение запросов.
Стоит использовать Entity Framework как для управления схемой базы данных, так и для реализации запросов. Можно не использовать change tracker, а использовать EF как генератор команд и интерфейс работы с базой.
Репозиторий со всеми коммитами тут
PS. Я бы остановился на варианте с логикой в базе данных и не стал бы прикручивать очередь, ограничился настройкой параметров пула и таймаутов.
Комментарии (31)

Shadow_ru
13.10.2025 09:38Попытка впихнуть весь поток изменений товаров в одну БД это конечно страшно. Если мы точно знаем, что у нас на складах в разы больше товаров чем дневной заказ все что описано выше ведёт к затратам ресурсов совершенно бессмысленным. Гораздо дешевле разгребать очередь в фоне мелкими воркерами

onets
13.10.2025 09:38В том то и дело - мы точно не знаем. Можно усложнить в стиле озона - сделать батч и перед выполнением посчитать и сравнить кол-ва на складе и во всех заказах. И если точно хватает, то можно и без блокировок.

gandjustas Автор
13.10.2025 09:38Проблема только в том, что между сравнением и записью "точно хватает" может превратиться в "точно не хватает".

gandjustas Автор
13.10.2025 09:38Попытка впихнуть весь поток изменений товаров в одну БД это конечно страшно.
А что страшного? Нагрузка на железо небольшая, упирается все в блокировки.
Если мы точно знаем, что у нас на складах в разы больше товаров чем дневной заказ все что описано выше ведёт к затратам ресурсов совершенно бессмысленным
Какие траты вы называете бессмысленными?
Обновление резервов? Его в любом случае надо делать и в любом случае оно будет тормозить
Проверка остатков? Это же простое сравнение, если его отключить, то разница незаметна
Это я в рамках теста знаю, что все запросы выполнятся успешно, а в жизни вы такого не можете гарантировать. Код корректно отработает в любом случае.
Гораздо дешевле разгребать очередь в фоне мелкими воркерами
Если если вдруг окажется, что резерв в очереди не может быть сделан - не осталось запасов на складе, тогда что? А вы уже деньги у клиента списали и он ждет что его заказ скоро приедет.
Непонятно что вы сэконосить пытаетесь, если вы сделаете очередь и все запросы последовательно будете обрабатывать, то у вас пропускная способность (от вызова метода клиента до бронирования товара на складе) упадет раза в 4 по сравнению с параллельным случаем. Вы сможете с меньшими ресурсами принимать заказы, но без обновления остатков какой в этом смысл?

Shadow_ru
13.10.2025 09:38Нагрузка на железо небольшая, упирается все в блокировки.
Нет, конечно. Вы загнали систему в пик производительности, и когда 28 декабря норот ринется делать пиковые закупы вечером и и утром все это заклинит, а разгребать будет вторая линия поддержки из пару человек. И не надо писать, что надо заранее железо увеличить, не поможет, надо будет шардировать, отчего вся опитимизация пойдет погулять
Какие траты вы называете бессмысленными
Абсолютно все. Зачем мне обновлять резерв прямо сразу, если у меня может быть страховой остаток и мне надо только не давать за некоторое время превысить его.
Вместо этого делаются жесткие вставки, причем в реляционную БД, и разумеется для ускорения все это будет дерномализовано потому что форейн ключи в шардах будут работать не так что бы быстро.
Проверки хз зачем, причем непонятно как это будет работать загнаннное в десяток подов, раскиданных по датацентрам.
не осталось запасов на складе, тогда что? А вы уже деньги у клиента списали
Значит заказ отменится. Цель - что бы такое было гораздо реже чем покупка. Вы видно в торговле работали примерно никогда, товар может не приехать по такому количеству причин, что "запасы на складе по базе" это такая мизерная часть, что возиться с кровью из глаз что бы вот точно-точно совпало с цыферкой на складе - это наивность. Потому что с утра жахнули при разгрузке два холодоса об асфальт, а списать еще не списали. Защитой является тот самый страховой остаток, поэтому все эти приседания - бессмыслены и даже вредны.
если вы сделаете очередь и все запросы последовательно будете обрабатывать
Ну что мешает разгребать ее параллельно, группируя по типам товаров (да у разных групп будет разная логика обработки остатков, вы же учли в своей логике мерные товары и продажу упаковками, да?). Да и последовательно для некоторых товарцев можно, часто два пика закупа бывает- утро и вечер, остальное время стоит, зачем мне железо купленное под пиковую производительность ? Купим пожиже и за ночь разгребем

gandjustas Автор
13.10.2025 09:38Нет, конечно.
Это вы о чем?
Вы загнали систему в пик производительности, и когда 28 декабря норот ринется делать пиковые закупы вечером и и утром все это заклинит, а разгребать будет вторая линия поддержки из пару человек.
Что значит "загнали систему в пик производительности" ? В коде из статьи быстродействие НЕ упирается в железо если что. Железо выдерживает в 5,5 раз больше.
И не надо писать, что надо заранее железо увеличить, не поможет, надо будет шардировать, отчего вся опитимизация пойдет погулять
Чему именно помешает шардирование?
Зачем мне обновлять резерв прямо сразу, если у меня может быть страховой остаток и мне надо только не давать за некоторое время превысить его.
Что делать если вы превысили страховой остаток? Да и в целом сам маркетплейс остатком не управляет, продавец отправляет на склад по мере готовности и спрос может превысить запасы.
Значит заказ отменится. Цель - что бы такое было гораздо реже чем покупка.
Если вы заказываете что-то в магазине, а ваш заказ отменяется, то будете ли вы еще что-то в нем заказывать?
товар может не приехать по такому количеству причин, что "запасы на складе по базе" это такая мизерная часть, что возиться с кровью из глаз что бы вот точно-точно совпало с цыферкой на складе - это наивность
Если вы всю обработку делаете в фоне, то количество таких причин конечно увеличится.
Защитой является тот самый страховой остаток, поэтому все эти приседания - бессмыслены и даже вредны.
Проблема в том, что в пики распродаж от момента поступления заказа до момента накладывания резерва могут заказать количество товаров в 10 раз превышающее страховой остаток. Вы же заранее не знаете как долго у вас будет очередь обрабатываться при нагрузке и как часто будут приходить заказы.
И еще раз напомню, что в случае маркетплейса сам маркетплейс пополняет остатки на складе, это делает только продавец. Поэтому у вас может просто не быть страхового остатка.

oracle_schwerpunkte
13.10.2025 09:38Вы видно в торговле работали примерно никогда, товар может не приехать по такому количеству причин, что "запасы на складе по базе" это такая мизерная часть, что возиться с кровью из глаз что бы вот точно-точно совпало с цыферкой на складе - это наивность.
Но вот конкретно резервирование можно было бы и нормально сделать, без отговорок.

onets
13.10.2025 09:38Много информации и предположений, хотелось бы уточнить некоторые моменты
Сервис из статьи как раз может стоять после очереди и принимать не заказы, а список ItemId и Qty. И до этого может быть некая пред-обработка (упаковок, китов и так далее). Это же просто облегченный пример, чтобы показать, что обычное железо с простым кодом может сделать 600 заказов в секунду.
Если 600 заказов в секунду на один склад и система справляется - то зачем шардирование? Но все же хорошо бы узнать - сколько там на самом деле у озона, возможно у них больше 600 и их усложненное решение оправдано
В любом случае бутылочным горлышком будет не программный код или БД, а люди, которые бегают по складу и собирают заказ с полок. Сколько надо людских и иных ресурсов и складских площадей, чтобы физически обработать 600 заказов в секунду (это порядка 50 млн за 24 часа) на одном складе?
Со страховым остатком разумеется дельная штука - можете чуть детальней рассказать, как вы его будете пересчитывать, чтобы он не пошел в разнос при параллельной обработке? Есть же базовый пример для общепринятого языка программирования - счетчик с interlock или volatile. Тут придется сделать примерно то же самое. И какой будет механизм отката, когда число выйдет за обозначенные рамки?
"Купим пожиже и за ночь разгребем" - в некоторых случаях может быть определен SLA, типа обработать 200 тыщ заказов за 2-3 часа, а не ночью
И еще момент - конкретно в случае с маркет-плейсом, когда резерв товара происходит до момента оплаты и откат резерва, если оплата не прошла. С точки зрения пользователя и клиентской части - это синхронна обработка. Если обрабатывать через очередь, то надо будет усложнять решение, чтобы дружить синхронного клиента и асинхронную очередь. Поэтому альтернатива с синхронным резервом без очередей - вполне себе альтернатива.

GidraVydra
13.10.2025 09:38Если мы точно знаем, что у нас на складах в разы больше товаров чем дневной заказ...
...то вы явно не понимаете бизнес-логику маркетплейса.

onets
13.10.2025 09:38Теперь интересно, что на это скажет озон. Они кстати не показали в статье результаты нагрузочный тестов.

onets
13.10.2025 09:38Нашел неподтвержденную инфу в интернетах
Крупный логистический центр Ozon может обрабатывать более 900 тысяч заказов в день, а распределительный центр Ozon обрабатывает до 600 тысяч заказов в сутки
Это скорее всего физическая отгрузка. Но для программной системы - это порядка 10 заказов в секунду.

GidraVydra
13.10.2025 09:38Это физические ограничения склада. Озоновская система может только у одного пользователя сформировать несколько сотен заказов за клик, а реальная пиковая производительность их АПК скорее всего на порядок-другой выше.

beskaravaev
13.10.2025 09:38Точных данных по rps я вам не скажу, но направлю на подумать.
1) Грубо прикидывать rps исходя из кол-ва заказов в день - неправильно, т.к. нагрузка в течении дня неравномернавя
2) Если перечитать статью, то можно заметить, что с обычной нагрузкой проблем не было. Проблемы были при старте продаж хамеров (товаров по супер низкой цене). Это когда условно 10к товаров сметают за пару секунд. 1 товар в одни руки. При этом далеко не у всех резерв завершается успешно как у вас.
Теперь вопрос, что будет с вашей системой, когда в неё прилетит под 10к RPS?

alexfilus
13.10.2025 09:38А партиционирование по складам тут не поможет?

onets
13.10.2025 09:38В реальном мире все сложнее, клиент в Москве, а два склада в Москве и Новосибирске - насколько будет дороже и дольше доставка?
Поэтому хорошо бы сначала выбрать оптимальный склад.

gandjustas Автор
13.10.2025 09:38Вы наверное не до конца поняли проблему.
Чтобы сделать резервацию надо :
Найти строку в бд
Дождаться снятия блокировки обновления
Повесить свою блокировку обновления
Записать новую строку
И повторить это от 1 до 10 раз (сколько строк в заказе). И только в самом конце снимаются блокировки.
То есть следующая транзакция скорее всего зависнет на строке 2 на относительно долгое время. И транзакция за ней тоже будет висеть.
В тесте, когда выпадали дедлоки, я видел циклы ожиданий, которые состояли из 9 транзакций.
Блокировка происходит не на диапазонах, не на очереди записи на диск, а просто на ожидании снятия блокировки ОДНОЙ строки в один момент времени. Ни партицирование, ни шардирование тут не поможет.
Такова цена ACID гарантий при высокой конкурентности. И это самый лучший вариант на самом деле, все альтернативы еще более медленные, как в примере с однопоточной очередью.

alexfilus
13.10.2025 09:38В случае блокировок на 1 строке тут действительно вряд ли можно что-то придумать. Но такое партиционирование немного снизило бы накладные расходы на поиск и управление блокировками.

jakobz
13.10.2025 09:38Вместо самодельных очередей, надо вешать rate limiter на входе. Он поставит http-запросы в очередь, и пустит в параллель сколько скажешь.
Тогда, если разные куски когда будут бороться за коннекты, не получится что у тебя куча запросов ждет освободившегося коннекта чтобы завершится, а новые все прилетают - и тоже получают шанс сожрать коннект.

gandjustas Автор
13.10.2025 09:38Rate Limiter же обычно
429возвращает, а не ждет выполнения. Непонятно насколько это лучше, чем клиент просто подождет
onets
13.10.2025 09:38А походу особо и не заморачиваются, как-то был случай с покупкой видеокарты на DNS - все просто подвисло, а потом когда отвисло, они кончились.

jakobz
13.10.2025 09:38Когда заморачиваются - картинку на cdn включают, типа «вас слишком многа», чтобы часть трафика убрать, и хоть для кого-то работать. Когда все подвисает - это как раз когда не подумали, и дали завалить всё и всем.

jakobz
13.10.2025 09:38Там есть очередь же. Настраиваешь чтобы у тебя запросы не запускались в обработку больше, скажем, 10 одновременно. Остальные ждут в очереди сколько скажешь. И дальше крутишь ручки на перф-тесте. Обычно надо чтобы уперлось в CPU базы. Если под недонагружен по cpu, а база еще дышит - можно побольше в параллель навалить.
База, от того что ей в параллель надо кучу запросов вертеть, быстрее не работает - у нее тоже не бесконечное количество процов. Ей быстрее последовательно все делать плюс-минус по очереди.
Ну и исключается гонка за коннектами, сотни запросов ждущих блокировки и коннекты к БД. Все что система не переварит - видно по растущей очереди.

perfectdaemon
13.10.2025 09:38В оригинальной статье рассматривалась проблема высокой конкурентности за одну строку (один и тот же товар — условную сковородку, которую хотят купить все) и RPS описан как «тысячи» (но точного числа нет), а у вас вышло 700 RPS на разные товары.
Не умаляю ваши заслуги и приведенный код, но кажется, что ответ выше неполный :)

gandjustas Автор
13.10.2025 09:38Во-первых в статье написать можно любую цифру
Во-вторых у меня код выдает 600 заказов в секунду и 300+ обновлений остатков в секунду на каждый товар. Я могу сделать по 100 товаров и 100 складов, распределив равномерно запросы между нему, тогда количество запросов в секунду будет около 2000, а количество броней на один товар в секунду - меньше 100.
В-третьих у меня код работает на одной машине, в docker в wsl вместе со средой разработки и тестирования. Мы же не знаем какое железо было в оригинальной статье.

Dhwtj
13.10.2025 09:38ALTER TABLE stock ADD CONSTRAINT check_reserved CHECK (reserved <= quantity);
То есть перенесем один из инвариантов в СУБД
Убирает необходимость ручной проверки через AnyAsync(...)
Ещё можно
var stockRows = await ctx.Stock .FromSqlRaw(@" SELECT * FROM stock WHERE (item_id, warehouse_id) IN ({0}) ORDER BY item_id, warehouse_id FOR UPDATE", string.Join(",", keys.Select(k => $"({k.ItemId},{k.WarehouseId})"))) .ToListAsync(ct);Полностью устраняет дедлоки, но это уже вышел за пределы ORM

onets
13.10.2025 09:38Вот тут поясню (у меня такая же задача, только у меня тут не озон конечно).
Конфигурация - MySql 8.0 / i7-8700k / 32Gb, на одном складе, 10 items, 100 потоков, 1000 requests
Вариант, когда я беру из БД с SELECT FOR UPDATE, обновляю в C# коде и потом сохраняю в БД - в логах консольки деадлоки все равно есть, retry в EF Core срабатывает, цифры в конце сходятся, скорость примерно 20 rps
Вариант с хранимкой с SELECT FOR UPDATE, в логах консольки деадлоки есть, retry в EF Core срабатывает, цифры в конце сходятся, скорость примерно 60-90 rps
Вариант с хранимкой БЕЗ SELECT FOR UPDATE + без проверки остатка в конце, в логах консольки пусто, цифры в конце сходятся, скорость 400-440 rps
Вариант с хранимкой БЕЗ SELECT FOR UPDATE + с проверкой остатка в конце, в логах консольки пусто, цифры в конце сходятся, скорость 300 rps
И что еще заметил - последние два варианта отрабатывают нормально даже на уровне ReadUncommitted. Меня это смущает, пока не понимаю почему.
Позже пожалуй поставлю postgresql и проверю тоже самое на нем, интересно узнать разницу между дефолтным mysql и дефолтным postgresql
alexfilus
Для избегания дедлоков ещё есть трюк с сабселектом примерно такого вида
тогда блокировки будут браться именно в порядке сортировки
gandjustas Автор
Увы даже такой вариант у меня теряет обновления при использовании
UNNEST. Я все что можно перепробовал.А во всех остальных случаях код с
ORDER BY ... FOR UPDATEработает не быстрее триггеров. НоFOR UPDATEнельзя написать на EF, только SQL.alexfilus
Именно в сочетании с unnest? У нас такое в проде несколько лет уже крутится, проблем не замечено.
gandjustas Автор
Именно с ним, без repeateable read получаю потерю обновлений при конкурентных обновлениях.