redb
redb

Проблема

У вас не 5 микросервисов — у вас десятки. Бэкенд, который рос три года: монолит, расколотый на куски, GPS-фид от автопарка, мобильное приложение водителя, веб-кабинет диспетчера, интеграции с SAP / 1С / регуляторами / маркетплейсами, отдельный SMTP-воркер, отдельный PDF-генератор, отдельный шедулер ночных пересчётов. Между ними — Kafka (несколько кластеров, по топику на домен), RabbitMQ (RPC + pub/sub + DLQ), Redis (кэш, last-known-state, pub/sub-каналы), пара HTTP-эндпоинтов наружу, SFTP с поставщиком, SQL-polling outbox-таблицы старого монолита, MQTT с трекеров, IBM MQ для одного древнего банковского контура, SignalR-хабы для real-time-дашбордов. На каждом стыке — свой ретрай, свой DLQ (или нет DLQ), своя сериализация, свои метрики (или нет метрик), своя бойлерплейт-обвязка из консьюмеров и try/catch.

Каждый из этих стыков живёт своей жизнью в Program.cs соответствующего сервиса. Каждый — это hand-rolled цикл:

// Где-то в OrdersService:
var consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();
consumer.Subscribe("orders");
while (!stoppingToken.IsCancellationRequested)
{
    try
    {
        var msg = consumer.Consume(stoppingToken);
        var dto = JsonSerializer.Deserialize<OrderDto>(msg.Value);
        if (dto.Type != "new") continue;          // filter

        var retries = 0;
        while (true)                              // retry
        {
            try { await PublishToRabbit(dto); break; }
            catch (Exception ex) when (retries++ < 3)
            { await Task.Delay(TimeSpan.FromSeconds(retries * retries)); }
        }
        consumer.Commit(msg);                     // ack
    }
    catch (ConsumeException ex) { _logger.LogError(ex, "kafka"); }
    catch (Exception ex)        { _logger.LogError(ex, "send"); }
}

И таких 80 строк — на каждое направление. У каждого — свой ретрай, свой DLQ (или нет DLQ), своя сериализация, свои метрики (или нет метрик). На code review это «ну да, чё». Через год на онбординге нового разработчика — «а где тут вообще что лежит».

На JVM эту задачу 20 лет назад решили Apache Camel и более общий Enterprise Integration Patterns (книжка Gregor Hohpe и Bobby Woolf, та самая «жёлтая»). Идея простая: маршрут описывается как pipeline From → Process → To, EIP-паттерны (Splitter, Aggregator, Content-Based Router, WireTap, Dead Letter Channel, Idempotent Consumer, Saga) — first-class элементы DSL. Транспортов — 300+: от Kafka до LDAP, от S3 до IBM MQ.

В .NET этого не было.

MassTransit, NServiceBus, Wolverine — отличные message bus'ы, но это не одно и то же. У них 4–7 транспортов (Kafka, RabbitMQ, Azure SB, SQS, иногда SQL), фокус — durable saga + handler classes. EIP-каталог они закрывают на 3-4 паттерна (Saga, Request/Response, Outbox). А когда нужно «забери XML из SQL stored proc → распарси → положи в RabbitMQ → продублируй в SFTP-архив с retention 30 дней» — пишешь руками всю эту классическую кашу из консьюмеров, продюсеров и try/catch.

И отдельный поворот, который случился буквально сейчас: MassTransit v9 стал коммерческим. Проект переехал к новой компании Massient, Inc., на главной новой документации masstransit.massient.com кнопка «Get a License», в доках — «Configure the license key», Customer Support и Usage Telemetry. Дословно с их сайта: «Massient is the new company behind the commercial release of MassTransit v9.» То есть из «трёх .NET-альтернатив» две — платные (NServiceBus давно, MassTransit v9 теперь), остаётся Wolverine на MIT — и он, опять-таки, не ESB, а mediator + saga, без двух десятков транспортов и без EIP-каталога. Свободного Camel-уровня инструмента под .NET до сих пор не было.

Хотелось: описать маршрут как pipeline на C#, чтоб библиотека сама знала про Kafka, RabbitMQ, SQL, SFTP, HTTP, MQTT, retry, DLQ, transactional ack, и при этом был полный набор EIP-паттернов из канонической книжки.

Написали. Выложили под Apache 2.0.


Production case

Этот текст — не теория. redb.Route — это ESB-каркас уровня Apache Camel, и мы это утверждаем не на бумаге: 22 внешних транспорта + 5 встроенных компонентов, 30+ EIP-паттернов как first-class элементы DSL, compiled expression engine на System.Linq.Expressions, transactional pipelines с ITransactedAction (Kafka EOS, RabbitMQ publisher confirms + tx channels, IBM MQ syncpoint, AMQP 1.0, SQL via TransactionScope), persistent IdempotentConsumerSaga с компенсирующими шагами, CircuitBreakerThrottleAggregatorResequencerScatter-GatherRecipientListDynamicRouterEnrich/PollEnrichClaim Check, OpenTelemetry на каждый шаг, runtime-контейнер с hot-reload и кластером без внешнего координатора. 351 проходящий тест, включая отдельные DSL reference-suites под Camel-семантику Choice/When/Otherwise, TryCatchFinally и Filter (~1500 строк только в спецификациях). Это не «фреймворк-обёртка над IHostedService», это полноценный ESB.

И он работает в проде. Первый — крупная логистическая компания (~150k заказов/мес, ~20k B2B-клиентов, собственный автопарк, 600+ городов). Внутренняя TMS — ~500 водителей + ~50 диспетчеров, 3-нодовый кластер (Xeon, 4 ядра / 8 ГБ / 50 ГБ SSD на ноду), ~3 месяца стабильной работы, 10–15% CPU под полной нагрузкой.

Через redb.Route туда заведены: SAP S/4 (через SQL polling stored procedure → XML → redb), Kafka (фид с GPS-трекеров), RabbitMQ (внутрисервисная шина событий), HTTP API для UI диспетчера, Меркурий / ЕГАИС / Честный ЗНАК / ФГИС Зерно (российские регуляторы), LDAP/AD для авторизации, cron-backup через redb.Export с ротацией. Один InitRoute-файл регистрирует 22 RouteBuilder-класса. Каждый — изолированный pipeline, со своими OnException-обработчиками, своим RouteId, своими метриками OpenTelemetry.

Второй прод — платформа доставки того же заказчика, десятки микросервисов в одном репозитории (~37 .NET-проектов, ~50+ RouteBuilder-классов). Модули общаются через RabbitMQ (RPC + pub/sub), внутренний event-bus, Kafka с Consumer Group для GPS-стрима с мобилок, Redis для last-known-location-кэша, RabbitMQ-DLQ под poison messages, и всё это завязано на единый ESB-каркас. То есть это не «5 микросервисов с REST API» — это десятки независимо деплоящихся модулей, у каждого свой IRouteContext, свой набор endpoint'ов, свой lifecycle. Сравнимая по сложности картинка на JVM была бы — Apache Camel + Karaf. У нас — redb.Route + Tsak.

И всё это — в трёх стендах: dev / test / prod, каждый — 3-нодовый кластер. То есть RouteBuilder-классы, которые вы видите дальше по статье, прямо сейчас крутятся в девяти инстансах Tsak параллельно: три ноды поднимают одинаковые контексты, redb-координатор раздаёт между ними кластерные маршруты (Kafka Consumer Group — балансировкой партиций, RabbitMQ — конкурентные consumer'ы на той же очереди, HTTP-фасады — за L4-балансером, cron — leader-election через redb-объекты, чтобы крон-задача отработала ровно один раз на кластер). Дев-стенд — для разработки и feature-веток, test — приёмочные сценарии и регресс перед релизом, prod — собственно нагрузка. Между стендами разъезжается тот же .tpkg-пакет: меняется только конфиг и строка подключения к БД. Cluster + hot-reload + три параллельных стенда — это и есть «production-ready», а не «у меня на ноуте работает».

Третий прод — аналитическая платформа (~672k объектов, ~8M свойств), там Route играет роль внутренней шины между модулями. Реальный код всех трёх — ниже.

И это не «локальные сборки из репозитория». Вся стопка 3.0.0 опубликована на nuget.org — 43 пакета под префиксом redb.*, суммарно ~20 800 загрузок на момент написания статьи. По убыванию: redb.Route (~1600), redb.Core (~1590), redb.Core.Pro (~1160), redb.Templatesredb.Postgres / redb.MSSql (~1000 каждый), дальше Pro-провайдеры, redb.CLIredb.Export, и весь зоопарк redb.Route.* транспортов (Kafka, RabbitMQ, IBM MQ, MQTT, AMQP, Redis, gRPC, SignalR, WebSocket, TCP, SFTP, FTP, File, S3, Mail, LDAP, Quartz, Elasticsearch, AzureServiceBus, Firebase, Http, Sql) + redb.Tsak.* (Core, Core.Pro, Client, CLI, Contracts, Templates). То есть всё, про что эта статья — реально лежит, версионируется и подтягивается через dotnet add package.


Как выглядит маршрут

Чтобы не врать игрушечным From → Filter → To, сразу боевой shape. Это упрощённый набросок из лога-трекинговой системы: HTTP-фасад принимает массив GPS-точек, throttle на 200 rps, идемпотентность по MessageId, валидация JSON, ветвление по типу события, Multicast параллельно в три транспорта (Kafka на агрегацию, RabbitMQ на DAL-запись, Redis-кэш last-known-location фоном через WireTap), а наружу — JSON-ответ:

public class GpsHttpFacadeRoutes : RouteBuilder
{
    protected override void Configure()
    {
        OnException<JsonException>()
            .Handled()
            .SetHeader("HTTP_SC", Constant(400))
            .To("direct://error-handler")
        .EndOnException();

        From("http://0.0.0.0:5090/integration/gps?methods=POST")
            .RouteId("gps-http-facade")
            .Throttle(200).Per(TimeSpan.FromSeconds(1))                      // не больше 200 rps
            .IdempotentConsumer(Header("MessageId"), repository: "redb")     // дубли отсекаем
            .Process(ValidateGpsBatch)                                       // JSON → List<GpsPoint>
            .Choice()
                .When(Header("eventType").IsEqualTo("location"))
                    .Multicast().ParallelProcessing()
                        .To("kafka:mobile.trips.location.sync?key=${header.tripId}")
                        .To("rabbitmq:?exchange=lt.dal&routingKey=dal.trips.location.sync")
                        .WireTap("redis:set:gps:last:location:${header.tripId}?ttl=86400")
                    .EndMulticast()
                .When(Header("eventType").IsEqualTo("checkin"))
                    .To("rabbitmq:?exchange=lt.dal&routingKey=dal.gps.checkin_checkout")
                .Otherwise()
                    .Log(LogLevel.Warning, "unknown eventType: ${header.eventType}")
            .EndChoice()
            .SetBody(Constant("""{ "status": "accepted" }"""))
            .SetHeader("Content-Type", Constant("application/json"));
    }
}

Этот один класс заменяет: HTTP-контроллер, middleware-throttle, ручную идемпотентность через Redis, ручной Kafka-продюсер, ручную RabbitMQ-обвязку, try/catch поверх десериализации и логику дёргания Redis из background-сервиса. Всё, что внутри — first-class элементы DSL: ThrottleIdempotentConsumerChoiceMulticastWireTapOnException<T>. Каждый из них — отдельный EIP-паттерн с собственными метриками OpenTelemetry.

Кстати, именно такой Route-фасад в одном из продов сейчас замещает WSO2 Micro Integrator, который до этого играл роль HTTP→Kafka/RabbitMQ-шлюза. WSO2 даёт ту же машинерию через XML-конфиги и Synapse-DSL — но когда вокруг уже C#-стек, держать отдельную JVM ради <switch>/<foreach parallel-execution="true">/<endpoint uri="rabbitmq:/?..."> — лишнее звено. Route даёт те же EIP, но в одном процессе, на одном языке, с C#-типизацией и общими IServiceProvider-зависимостями.

Регистрация:

// Program.cs
builder.Services.AddRedbRoute(route =>
    route.AddRouteBuilder<GpsHttpFacadeRoutes>());
builder.Services.AddRedbRouteHttp();
builder.Services.AddRedbRouteKafka();
builder.Services.AddRedbRouteRabbitMQ();
builder.Services.AddRedbRouteRedis();

Маршрут стартует вместе с приложением, останавливается на shutdown с graceful drain, метрики OpenTelemetry — по каждому шагу из коробки (route.gps-http-facade.durationmulticast.to[1].failuresidempotent.duplicatesthrottle.rejected).

Полный код такого аггрегатора (только с Kafka на входе, без HTTP) — ниже, в разделе «Второй прод — RabbitMQ как RPC и pub/sub шина». А кому нужен hello world на 15 строк — он в redb.Route.Demo.


Чего хотел Camel — и что из этого есть

EIP-паттернов из канонической книжки в Camel — 80+. Мы закрыли 30+ — самые ходовые. Все они — first-class элементы DSL:

// Content-Based Router
.Choice()
    .When(Header("priority").isEqualTo("high")).To("direct://fast-lane")
    .When(Header("priority").isEqualTo("low")).To("seda://batch")
    .Otherwise().To("direct://standard")
.EndChoice()

// Splitter — каждое вложение обрабатываем отдельно
.Split(e => e.In.GetBody<Order>().Items)
    .Process(async (e, ct) => await ProcessItemAsync(e, ct))
.EndSplit()

// Aggregator — собираем по correlationId
.Aggregate(Header("orderId"),
    strategy: new CompletionAggregator(),
    completionSize: 10,
    completionTimeout: TimeSpan.FromSeconds(30))

// WireTap — отправить копию в audit без блокировки основного потока
.WireTap("seda://audit")

// Recipient List — динамический fan-out
.RecipientList(e => ResolveDestinations(e))

// Idempotent Consumer — дедупликация по ключу (persistent backend)
.IdempotentConsumer(Header("messageId"), repository: redbRepository)

И ещё: MulticastDynamic RouterResequencerScatter-GatherClaim CheckThrottleDelayLoopEnrichPollEnrichSaga (без durable state — компенсирующие шаги), Circuit BreakerValidateMarshal/UnmarshalTransactedProcess. И стандартные обвязки — RetryDeadLetterChannelTryCatch/DoCatch/DoFinallyOnException.


Compiled expression engine

В Camel есть Simple Language — ${header.priority}${body.items.size()} и так далее. У него один минус: он интерпретируется в рантайме. На горячем пути это заметно.

У redb.Route есть то же самое, но компилируемое${header.priority}${body.OrderId}, арифметика, JSONPath, XPath — всё это парсится один раз при Build() маршрута, превращается в Expression<Func<IExchange, T>> через System.Linq.Expressions, компилируется в IL и кэшируется. На горячем пути — обычный делегат.

// Предикаты в Choice.When / Filter — три формы:
.When(e => e.In.GetHeader<string>("priority") == "high")   // 1. lambda
.When(Header("priority").isEqualTo("high"))                // 2. typed builder
.When("header.priority == 'high'")                         // 3. string expression

// String templates в SetBody / SetHeader / Log:
.SetHeader("reply", "${header.orderId}-confirmed")
.SetBody("Order ${body.Id}: total=${body.Total} for ${header.customerName}")
.Log("Processed ${header.orderId} in ${header.elapsed}ms")

Можно смешивать. Internally — один и тот же AST с одним и тем же компилятором.


Транспорты

22 внешних + 5 встроенных:

Категория

Транспорты

Очереди / шины

Kafka, RabbitMQ, IBM MQ, MQTT (5.0), AMQP 1.0, Azure Service Bus, Redis

HTTP / RPC

HTTP (in/out), WebSocket, SignalR, gRPC, TCP

Файлы / хранилища

SFTP, FTP, File, S3, Firebase (Firestore + Cloud Storage + FCM), GenericFile (base)

Базы данных

SQL (polling outbox-style)

Корпоративные

LDAP / Active Directory, Mail (SMTP/IMAP/POP3), Elasticsearch 8.x

Планировщик

Quartz, Cron

Встроенные

Direct, SEDA, Timer, Mock, Log

Каждый — отдельный NuGet-пакет (redb.Route.Kafkaredb.Route.RabbitMQ и т.д.). Подключаются явно через AddRedbRoute*(). У большинства — fluent builder сверху URI-строки:

// URI form
From("kafka://orders?groupId=svc&brokers=broker1:9092,broker2:9092&autoOffsetReset=earliest");

// Type-safe builder — то же самое
From(Kafka.Topic("orders")
    .Brokers("broker1:9092", "broker2:9092")
    .GroupId("svc")
    .AutoOffsetReset(AutoOffsetReset.Earliest)
    .Acks("All"));

Builder лучше тем, что (а) intellisense, (б) опечатки ловятся компилятором, (в) refactor → rename работает.


Обработка ошибок — четыре уровня

Самое больное в hand-rolled-интеграциях. У Route — четыре уровня, которые комбинируются:

1. Per-step Retry — локальный retry вокруг одного шага. Когда внешний сервис флакающий, но в целом живой:

.Retry(maxRetries: 5, initialDelay: TimeSpan.FromSeconds(1))
.To("http://flaky-service/submit")

2. DeadLetterChannel — route-level. Любая необработанная ошибка после retry уходит в указанный sink. Exception сохраняется на exchange, можно прочитать в DLC-маршруте:

From("kafka://orders")
    .DeadLetterChannel("seda://dlq")
    .Process(...)
    .To("sql://orders");

From("seda://dlq")
    .Log("DLQ: ${exception.message}")
    .Choice()
        .When(e => e.GetException() is TimeoutException).To("seda://retry-later")
        .Otherwise().To("sftp://archive/failed/")
    .EndChoice();

3. DoTry / DoCatch / DoFinally — scoped, как обычный try/catch внутри маршрута:

.DoTry()
    .To("http://external-api/submit")
    .Process(async (e, ct) => await PostProcess(e, ct))
.DoCatch<HttpRequestException>()
    .Log("HTTP failure: ${exception.message}")
    .To("seda://retry-queue")
.DoCatch<TimeoutException>()
    .Log("Timeout, archiving")
    .To("sftp://archive/timeouts/")
.DoFinally()
    .Log("Attempt complete (success or failure)")
.End()

4. OnException<T> — на уровне всего контекста. Глобальные обработчики: регистрируются в IRouteContext и применяются ко всем маршрутам контекста, а не только к From(...) из этого класса. Объявил один раз в любом RouteBuilder — ловит исключения по всем From(...) всех RouteBuilder-ов этого модуля:

public class OrderRoutes : RouteBuilder
{
    protected override void Configure()
    {
        OnException<HttpRequestException>()
            .MaximumRedeliveries(5)
            .UseExponentialBackOff()
            .BackOffMultiplier(2.0)
            .Handled()                  // exchange продолжает идти как ни в чём
            .To("seda://http-failures")
        .EndOnException();

        OnException<DbException>()
            .MaximumRedeliveries(2)
            .UseOriginalMessage()       // восстановить тело до обработки
            .To("seda://db-failures")
        .EndOnException();

        // Эти обработчики ловят исключения не только тут, а во всех
        // RouteBuilder-ах модуля — они живут на уровне контекста:
        From("kafka://orders").To("http://payments-svc/charge");
        From("kafka://shipments").To("http://logistics-svc/dispatch");
    }
}

.Handled().Continued().UseExponentialBackOff().UseOriginalMessage() — стандартные крутилки Camel'а, перенесённые один-в-один.


Transactional routes — что под капотом

.Transacted() — это не просто метка «оберни в TransactionScope». Это контракт с транспортом: при успехе всего pipeline сделай commit/ack, при failure — rollback/nack.

From("kafka://orders?groupId=svc&brokers=...")
    .Transacted()                          // открывается транзакция
    .ProcessWithRedb(async (redb, e, ct) => await SaveOrderAsync(redb, e, ct))
    .To("rabbitmq://orders-confirmed");    // подтверждается в RabbitMQ
                                           // commit Kafka offset + commit RabbitMQ + commit redb — атомарно

Что реально происходит:

  • Kafka-транспорт включает transactional producer (enable.idempotence=truetransactional.id), консьюмер коммитит offset через producer.SendOffsetsToTransaction(...) в той же транзакции. Это Kafka EOS — exactly-once между partition'ами.

  • RabbitMQ-транспорт включает publisher confirms (MaxOutstandingConfirms), плюс при .Transacted() — transacted channels (tx-select / tx-commit / tx-rollback).

  • IBM MQ, AMQP 1.0 — нативные транзакции через MQGMO_SYNCPOINT/local transactions.

  • SQL-транспорт биндит IDbTransaction к каждому шагу через TransactionScope (распределённую — для SQL Server, локальную — для PostgreSQL и MySQL).

Под капотом — интерфейс ITransactedAction, который каждый транспорт реализует по-своему. С точки зрения вашего кода — одна строчка .Transacted().


Реальный код из прода

Эти куски выдрал из боевого репозитория той самой внутренней TMS. Имена методов и переменных как есть.

InitRoute — точка входа модуля Tsak. Регистрируются компоненты (транспорты), redb-схемы, словари, и 11 RouteBuilder-классов:

public static class InitRoute
{
    public static IRouteContext main(IRouteContext context)
    {
        // 1. Транспорты модуля
        context.AddComponent(new SqlComponent());
        context.AddComponent(new HttpComponent { ServerManager = new SharedHttpServerManager() });
        context.AddComponent(new CronComponent());

        // 2. Named SQL data source — connection string в DI, не в URI маршрута
        DbProviderFactories.RegisterFactory("Microsoft.Data.SqlClient", SqlClientFactory.Instance);
        context.AddToRegistry("sap-s4", (ISqlConnectionFactory)new SqlConnectionFactory(
            new SqlConnectionOptions
            {
                ConnectionString = sapConn,
                ProviderName = "Microsoft.Data.SqlClient"
            }));

        // 3. Синхронизация redb-схем — code-first, без миграций
        var redb = context.GetRedbService();
        redb.InitializeAsync(ensureCreated: true).GetAwaiter().GetResult();
        redb.SyncSchemeAsync<Driver>().GetAwaiter().GetResult();
        redb.SyncSchemeAsync<Vehicle>().GetAwaiter().GetResult();
        redb.SyncSchemeAsync<TransportationRoute>().GetAwaiter().GetResult();
        redb.SyncSchemeAsync<TransportationPoint>().GetAwaiter().GetResult();
        // ... ещё 8 схем

        EnsureTsUmLists(redb);
        await RefDataCache.RefreshAsync(redb);

        // 4. Маршруты
        var rc = (RouteContext)context;
        rc.AddRoutes(new TsumExceptionRouteBuilder());     // global OnException
        rc.AddRoutes(new TsumRouteBuilder());              // SQL polling от SAP S/4
        rc.AddRoutes(new TsumPlacementRouteBuilder());     // расстановка машин
        rc.AddRoutes(new TsumTransportStatusRouteBuilder());
        rc.AddRoutes(new TsumShippingPointRouteBuilder());
        rc.AddRoutes(new TsumBackupRouteBuilder());        // daily cron backup
        rc.AddRoutes(new TsumCleanupRouteBuilder());
        rc.AddRoutes(new TsumSliceJobRouteBuilder());
        // ... ещё 3

        return context;
    }
}

SQL polling из SAP S/4 stored procedure → XML → redb. Один из ключевых ETL-маршрутов:

private void ConfigureSqlConsumer()
{
    var procs = Context!.GetProperty<IDictionary<string, object?>>("Procedures")!;
    var proc = (string)procs["MonitoringReport"]!;   // имя процедуры — из конфига, по Mode (prod/test)

    var builder = Sql.Poll($"EXEC {proc} @DateFrom, @DateTo, @OutputFormat")
        .DataSource(Constant("sap-s4"))
        .CommandTimeout(300)
        .Delay(60000)                                // опрос раз в минуту
        .OutputType(SqlOutputType.Scalar)            // FOR XML возвращает одно значение
        .Param("DateFrom", (object?)null)
        .Param("DateTo", (object?)null)
        .Param("OutputFormat", "XML");

    From(builder)
        .RouteId("tsum-sql-consumer")
        .Process(e => e.In.ContentType = "application/xml")
        .Process(DeserializeXml)                     // XmlSerializer → List<TransportationOrder>
        .To("direct://tsum");
}

private void ConfigureHttpConsumer()
{
    // Тот же конечный обработчик, но фронт для тестов руками
    From("http:0.0.0.0:5089/api/tsum/monitoring?inOut=true")
        .RouteId("tsum-http-consumer")
        .ConvertBody<string>()
        .Process(DeserializeXml)
        .To("direct://tsum");
}

private void ConfigureProcessing()
{
    // Единая точка обработки — что из SQL, что из HTTP
    From("direct://tsum")
        .RouteId("tsum-processing")
        .ProcessWithRedb(async (redb, exchange, ct) =>
        {
            var orders = exchange.In.GetBody<List<TransportationOrder>>();
            if (orders is null || orders.Count == 0) return;

            // Синхронизация справочников водителей/машин/точек
            var dicts = await DictionarySyncService.SyncFromOrdersAsync(redb, orders, ct);
            if (dicts.AnyChanged) await RefDataCache.RefreshAsync(redb);

            // Дедупликация существующих маршрутов
            var codes = orders.Select(o => o.Code).ToList();
            var existing = await redb.Query<TransportationRoute>()
                .WhereRedb(o => codes.Contains(o.ValueString!))
                .ToListAsync();

            // ... ещё ~200 строк бизнес-логики
        });
}

Заметная штука: один и тот же обработчик (direct://tsum) обслуживает и SQL-источник, и HTTP-эндпоинт для ручных тестов. Это и есть тот самый Camel'овский трюк — direct: как in-process invariant'ный канал, к которому можно прицепить любой To(...).

HTTP API на порту 5090 с авторизацией через JWT + LDAP. Один из endpoint'ов:

private void ConfigureRoutesEndpoint()
{
    From("http:0.0.0.0:5090/api/tsum/routes?inOut=true&cors=true&corsOrigins=*")
        .RouteId("tsum-api-routes")
        .Process(Auth.ProcessAsync)                         // валидация JWT, наполнение headers
        .Process(TsumAuthProcessor.RequirePermission(TsumPermission.ViewAll))
        .ConvertBody<string>()
        .ProcessWithRedb(async (redb, exchange, ct) =>
        {
            var bodyStr = exchange.In.Body?.ToString() ?? "{}";
            var filter = JsonSerializer.Deserialize<RouteFilterRequest>(bodyStr, JsonOptions);

            // GET-параметры тоже принимаем — для отладки в браузере
            string? QP(string name) =>
                exchange.In.Headers.TryGetValue($"redbHttp.QueryParam.{name}", out var v)
                    ? v?.ToString() : null;

            if (QP("startPlanFrom") is { } qpFrom &&
                DateTimeOffset.TryParse(qpFrom, out var from))
                filter.StartPlanFrom ??= from;
            // ... ещё несколько query-параметров

            var query = redb.Query<TransportationRoute>();

            // Динамическая фильтрация через redb LINQ
            if (!string.IsNullOrEmpty(filter.CodeSearch))
                query = query.WhereRedb(o => o.ValueString!.Contains(filter.CodeSearch));

            var spItems = await ResolveIds(filter.ShippingPointIds);
            if (spItems.Count > 0)
                query = query.Where(r => spItems.Contains(r.ShippingPoint));

            // ... ещё ~10 фильтров и пагинация
            var items = await query.Skip(offset).Take(limit).ToListAsync();
            JsonRouteHelper.SetJsonBody(exchange, new { total, offset, limit, items });
        });
}

HTTP-транспорт здесь — это redb.Route.Http с собственным Kestrel-консьюмером. inOut=true — request/response режим, ответ возвращается в Body exchange'а. Заголовки redbHttp.QueryParam.* парсятся транспортом автоматически.

Cron backup всей БД через redb.Export, с ротацией. Без сторонних шедулеров:

protected override void Configure()
{
    var backupConfig = Context!.GetProperty<IDictionary<string, object?>>("Backup");
    var directory = (string)backupConfig["Directory"]! ?? "backups";
    var retentionDays = int.Parse(backupConfig["RetentionDays"]?.ToString() ?? "7");

    Context.SetProperty("_backup.directory", directory);
    Context.SetProperty("_backup.retentionDays", retentionDays);

    From("cron://tsum-backup?schedule=0 0 3 * * ?")    // каждый день в 03:00
        .RouteId("tsum-backup-cron")
        .ProcessWithRedb(RunBackupAsync);
}

private async Task RunBackupAsync(IRedbService redb, IExchange exchange, CancellationToken ct)
{
    var pgConn = redb.Configuration.ConnectionString!;
    var directory = Context!.GetProperty<string>("_backup.directory")!;

    Directory.CreateDirectory(directory);
    var fileName = $"tsum_backup_{DateTime.UtcNow:yyyy-MM-dd_HHmmss}.redb";
    var filePath = Path.Combine(directory, fileName);

    var provider = ProviderFactory.Create("postgres");
    await provider.OpenAsync(pgConn, ct);
    var exportService = new ExportService(provider, verbose: false, batchSize: 10000);
    await exportService.ExportAsync(filePath, schemeIds: null, compress: true, dryRun: false, ct);

    RotateBackups(directory, retentionDays);
}

Транспорт cron:// — это redb.Route.Quartz с Quartz.NET под капотом. Cron-expression стандартный, Quartz-овский.

Если посмотреть на эти три фрагмента — там нет ни одной строки про инициализацию консьюмеров, про работу с IServiceProvider, про lifecycle, про graceful shutdown. Всё это сидит внутри Route. Бизнес-код — это Process(...) и ProcessWithRedb(...).


Второй прод — RabbitMQ как RPC и pub/sub шина

Те куски выше про TMS — это SQL + HTTP + cron. Чтобы не складывалось впечатление «redb.Route только под polling-задачи», вот фрагменты из второй продовой системы того же заказчика — платформы доставки на ~150k заказов/мес. Модули общаются друг с другом через RabbitMQ: одни маршруты — синхронный RPC поверх очередей с CorrelationId, другие — fire-and-forget pub/sub. Старая версия кода была на snake-case-форке нашего DSL; сейчас всё работает на актуальном 3.0 — привожу к каноническому API.

RPC через RabbitMQ — запрос/ответ с CorrelationId. Модуль ControlPanel пишет в очередь dal.controlpanel.trips.get.tracking.link, ждёт ответ с тем же CorrelationId. На стороне DAL:

public class TripLinkRouteBuilder : RouteBuilder
{
    protected override void Configure()
    {
        From(Rabbit.Queue("dal.controlpanel.trips.get.tracking.link")
                   .ConnectionFactory("RabbitMQConnectionFactory"))
            .RouteId("controlpanel_getTrackingLink")
            .SetHeader("routeId", "controlpanel_getTrackingLink")

            // Валидация: без CorrelationId роутить нечего
            .Filter(Header("CorrelationId").IsNull())
                .DoThrow<ArgumentNullException>(
                    "CorrelationId is required. RouteId='controlpanel_getTrackingLink'")
            .EndFilter()

            .Log("GetTrackingLink DAL STARTED: CorrelationId=${header.CorrelationId}")

            .InvokeController()                            // → TripController.GetTrackingLink
            .DbSaveChanges()                               // commit EF Core changes
            .ApiPackResponse(                              // упаковка в стандартный envelope
                toData:  Property("data"),
                toMeta:  Property("meta"),
                code_dal: 200,
                packJwt: true)

            .WireTap("direct://delivery.info.send")        // асинхронная нотификация — fire-and-forget
            .Log("GetTrackingLink DAL COMPLETED: CorrelationId=${header.CorrelationId}")

            .Respond();                                    // ответ на reply-to queue с тем же correlationId
    }
}

Rabbit.Queue(...).ConnectionFactory(...) использует named-фабрику из DI-registry (как named SQL data source в первом проде — секреты не торчат в URI маршрута). .Respond() — это RabbitMQ RPC-pattern: транспорт сам читает reply-to и correlation-id из заголовков входящего сообщения и публикует тело exchange'а обратно. Никакого ручного BasicProperties.ReplyTo / BasicProperties.CorrelationId в коде.

Глобальный OnException на весь RouteBuilder — стандартный envelope ошибки + 500:

protected override void Configure()
{
    if (!HasExceptionRoute<Exception>())
        OnException<Exception>()
            .Log("Exception: ${exception.message}", LogLevel.Error)
            .SetHeader("code_dal", (int)HttpStatusCode.InternalServerError)
            .SetBody(e => new BaseErrorResponse
            {
                traceId     = e.In.GetHeader<string>("CorrelationId") ?? "no-correlation",
                code        = (int)HttpStatusCode.InternalServerError,
                message     = e.Exception?.Message ?? "Internal server error.",
                description = e.Exception?.ToString()
            }.ToJson())
            .ExceptionHandled()
            .Stop()
        .EndOnException();

    // ConfigureGetTrackingLinkRoute() ... остальные routes выше
}

Pub/sub через RabbitMQ — email-нотификации. Любой модуль может бросить exchange'у lt.dal с routing key send.email — отдельный SMTP-воркер (отдельный процесс, отдельный модуль Tsak) подберёт и отправит. Локальная точка входа — direct://send-trip-email-notification, в неё пишут все RouteBuilder'ы, которым нужно уведомить:

public class EmailNotificationRouteBuilder : RouteBuilder
{
    protected override void Configure()
    {
        From("direct://send-trip-email-notification")
            .RouteId("dal_sendTripEmailNotification")
            .Log(LogLevel.Debug,
                "SendTripEmailNotification: tripId=${property.tripId}, type=${property.emailType}")

            .Process((e, ct) =>
            {
                // Изолированный scope под scope'd DbContext (route отрабатывает в WireTap-таске)
                using var scope = Context.GetServiceProvider()!.CreateScope();
                var db     = scope.ServiceProvider.GetRequiredService<LtContext>();
                var logger = Context.GetService<ILogger>();
                var cfg    = Context.GetProperty<IDictionary<string, object?>>("EmailNotifications");

                if (!IsEnabled(cfg)) { e.In.SetHeader("code_dal", 204); return; }

                var tripId = e.GetProperty<long>("tripId");
                var trip   = db.Trips.AsNoTracking()
                    .Include(t => t.Driver).ThenInclude(d => d!.IdNavigation).ThenInclude(a => a.Branch)
                    .Include(t => t.Trippoints).ThenInclude(tp => tp.Location)
                    .FirstOrDefault(t => t.Id == tripId);

                if (trip?.Driver?.IdNavigation?.Branch is not { } branch)
                { e.In.SetHeader("code_dal", 404); return; }

                // Адресатов берём из настроек филиала — список email'ов в одной из двух колонок
                var emailType = e.GetProperty<string>("emailType");
                var recipients = EmailHelper.ParseAndValidateEmailList(
                    emailType == "trip-point-change"
                        ? branch.Trippointorderchangeemaillist
                        : branch.Tripcompletionemaillist);

                if (recipients.Count == 0)
                { e.In.SetHeader("code_dal", 204); return; }

                var (subject, html) = BuildEmailContent(trip, emailType, cfg, e);

                e.In.SetHeader("emailTo",          string.Join(",", recipients));
                e.In.SetHeader("emailSubject",     subject);
                e.In.SetHeader("emailContentType", "text/html");
                e.In.SetBody(new { content = html }.ToJson());
                e.In.SetHeader("code_dal", 200);
            })

            // Только если письмо реально готово к отправке — пушим в очередь
            .Filter(Header("code_dal").IsEqualTo(200))
                .To(Rabbit.Exchange("lt.dal", type: "topic")
                          .RoutingKey("send.email")
                          .ConnectionFactory("RabbitMQConnectionFactory"))
                .Log("Email sent to RabbitMQ exchange lt.dal/send.email")
            .EndFilter();
    }
}

Дальше эту direct-точку дёргают через WireTap из любого маршрута, который меняет поездку:

// Где-то в TripRouteBuilder, после успешного UPDATE
.SetProperty("tripId",        Header("trip-id"))
.SetProperty("emailType",     Constant("trip-point-change"))
.SetProperty("changeDescription", e => "Order added on point #" + e.In.GetHeader<int>("point-seq"))
.WireTap("direct://send-trip-email-notification")

WireTap копирует exchange и пушит копию в указанный endpoint асинхронно, не блокируя основной поток. Получатель — direct://... в этом же процессе, дальше — RabbitMQ. Если SMTP-воркер лежит — письма копятся в очереди, основной HTTP API продолжает отвечать.

Kafka — GPS-координаты с мобилок. Самый нагруженный маршрут той же системы. Водители (~500 одновременно) льют батчи координат через мобильное API → топик mobile.trips.location.sync. На каждой ноде кластера сидит один consumer; Kafka Consumer Group сам раздаёт партиции между нодами (3 партиции → 3 ноды, по одной на каждую). Дальше — группировка по tripId, последняя точка каждого tripId асинхронно уходит в Redis (last GPS location для веб-дашборда), полный батч рассылается по RabbitMQ-очередям DAL-модулей (одна — на запись в БД, вторая — на детекцию check-in/check-out, третья — на пересчёт метрик):

public class GpsKafkaBatchAggregatorBuilder : RouteBuilder
{
    protected override void Configure()
    {
        var cfg = Context.GetSection(GetType().Name);

        // Один consumer на ноду. Партиции (0..N) распределит Consumer Group.
        // partition= НЕ указываем — иначе сломаем балансировку.
        From(Kafka.Topic(cfg.GetValue("kafka.Topic", "mobile.trips.location.sync"))
                  .Brokers(cfg.GetValue("kafka.Brokers", "localhost:9092"))
                  .GroupId(cfg.GetValue("kafka.GroupId", "gps-batch-aggregator"))
                  .MaxPollRecords(cfg.GetValue("kafka.MaxPollRecords", 100))
                  .AutoOffsetReset("earliest"))
            .RouteId("GpsKafkaBatchAggregator")
            .Transacted()                                       // Kafka EOS: commit offset в одной tx с downstream
            .Log("incoming batch: kafka.batch.size=${header.kafka.batch.size}")
            .To("direct://gps-batch-grouper");

        From("direct://gps-batch-grouper")
            .RouteId("GpsBatchGrouper")
            .Process(new GpsBatchGrouperProcessor(outputType: GroupingOutputType.Dictionary))
            .Log("trips=${header.gps.trips.count}, points=${header.gps.total.points}")

            // Извлекаем последнюю координату по каждому tripId → property для Redis
            .Process((e, ct) =>
            {
                var grouped = e.In.GetBody<Dictionary<long, List<LocationHistory>>>();
                var last = grouped.ToDictionary(
                    kvp => kvp.Key.ToString(),
                    kvp => new GpsLocationDto(
                        ts:  kvp.Value.Max(l => l.ts).ToString("o"),
                        lat: kvp.Value.OrderByDescending(l => l.ts).First().lat,
                        lon: kvp.Value.OrderByDescending(l => l.ts).First().lon).ToJson());
                e.SetProperty("gpsLastLocationMap", last);
            })

            // Fire-and-forget в Redis — основной pipeline ждать не должен
            .WireTap("direct://save-gps-last-location")

            // Раздача батча по трём очередям DAL
            .Multicast().ParallelProcessing()
                .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.trips.location.sync"))
                .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.gps.checkin_checkout"))
                .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.metrics.calculate"))
            .EndMulticast();

        // Запись last-location в Redis (TTL 24 часа). Ключ: gps:last:location:{tripId}
        From("direct://save-gps-last-location")
            .RouteId("SaveGpsLastLocation")
            .Split(Property("gpsLastLocationMap"))
                .SetHeader("tripId", e => ((KeyValuePair<string, string>)e.In.GetBody()).Key)
                .SetBody(e =>            ((KeyValuePair<string, string>)e.In.GetBody()).Value)
                .To("redis:set:gps:last:location:${header.tripId}?ttl=86400")
            .EndSplit();
    }
}

Соседний consumer (mobile.trips.unplanned.stops) делает детекцию незапланированных остановок — там интересен локальный OnException. JSON, который не парсится — это не транзиентная ошибка, retry не поможет. Маршрут отправляет сообщение в RabbitMQ-DLQ с метаданными (partition, offset, preview оригинального body) и пробрасывает исключение наверх — consumer падает, Kafka откатывает offset, поднимается заново, и проблемная партиция уйдёт в DLQ при следующей попытке. Это правильный pattern для poison message:

From(Kafka.Topic("mobile.trips.unplanned.stops").GroupId("unplanned-stops-processor")
          .Brokers(brokers).AutoOffsetReset("latest"))
    .RouteId("UnplannedStop-Kafka")
    .Transacted()
    .To("direct://unplanned-stop-grouper");

From("direct://unplanned-stop-grouper")
    .RouteId("UnplannedStop-Grouper")

    .OnException<JsonException>()
        .MaximumRedeliveries(0)    // JSON не станет валидным от повтора
        .Handled(false)            // консьюмер ДОЛЖЕН упасть, чтоб Kafka не закоммитила offset
        .Process(e =>
        {
            e.In.SetHeader("dlq.errorType",     "JsonException");
            e.In.SetHeader("dlq.errorMessage",  e.Exception?.Message);
            e.In.SetHeader("dlq.partition",     e.In.GetHeader<int>("kafka.partition"));
            e.In.SetHeader("dlq.offset",        e.In.GetHeader<long>("kafka.offset"));
            e.In.SetHeader("dlq.rawBodyPreview",
                Truncate(e.GetProperty<string>("OriginalKafkaBody") ?? e.In.GetBody<string>(), 500));
        })
        .To(Rabbit.Exchange("lt.dlq").RoutingKey("dal.dlq.gps.json"))
        .Log(LogLevel.Error,
            "JSON failed → DLQ: partition=${header.kafka.partition}, offset=${header.kafka.offset}")
    .EndOnException()

    .Process(new UnplannedStopDetectorProcessor(cacheManager, lookbackMinutes: 60))
    .To("direct://persist-unplanned-stops");

Несколько штук, которые видно только на реальном проде:

  • partition= в Kafka URI указывать нельзя. Чуть выше прокомментировано в коде — это типичная грабля. Если указать — Consumer Group ломается, все ноды конкурируют за одну партицию.

  • .Transacted() + .Multicast() — батч уходит во все три очереди одновременно (ParallelProcessing), но Kafka offset коммитится только если все .To(...) отработали. Если RabbitMQ временно недоступен — батч приедет повторно, без потерь.

  • WireTap для Redis — last-location нужен на дашборде «здесь и сейчас», но если Redis лежит, основной pipeline на запись в БД не должен ждать. WireTap копирует exchange в отдельный поток.

  • DLQ с dlq.rawBodyPreview — когда инженер открывает DLQ-очередь, ему нужны не «что-то сломалось», а partition + offset + первые 500 символов исходного body. С этим можно прийти к мобильному разрабу и сказать «вот это вы шлёте».

Что важно увидеть на этом примере:

  • Один и тот же Rabbit.Queue(...) / Rabbit.Exchange(...) builder покрывает и RPC (с .Respond()), и pub/sub (с .To(...)) — разница только в форме хвоста маршрута.

  • InvokeController() / DbSaveChanges() / ApiPackResponse(...) — это не часть базового Route, это шаги-расширения, которые сделали внутри проекта (lt.Core.Route). Любой шаг — это IProcessor, регистрируется в IRouteContext, доступен через extension method. То есть DSL расширяемый: команде нужны свои first-class steps под «вызови контроллер → сохрани EF Core → упакуй ответ» — пишут их один раз, используют во всех маршрутах.

  • WireTap + direct:// — стандартный Camel-овский паттерн для fire-and-forget'а внутри процесса. Никаких Task.Run, никаких IHostedService-очередей под капотом — это всё в Route.


Если хочется потрогать руками — redb.Route.Demo

В репозитории лежит модуль redb.Route.Demo — 39 маршрутов в 9 секциях, 18 транспортов, всё в одном проекте, всё запускается через dotnet run. По сути — единый reference implementation, в котором есть каждая фича фреймворка: RPC через RabbitMQ/AMQP 1.0/gRPC/IBM MQ, WireTap в Kafka и файл, SQL+TransactionScope, Redis Pub/Sub, TCP echo, WebSocket push, MQTT, SEDA, DirectVM cross-context, Timer/Cron, CircuitBreakerRetry с backoff, DeadLetterChannelAggregatorMulticastRecipientListDynamicRouterLoopResequencerEnrichIdempotentConsumerThrottle, JSON Schema validation, Traced+Metered, lifecycle listeners, named IRedbService.

Заходной маршрут — POST /api/demo. Дальше идёт типовой ESB-pipeline:

HTTP → Throttle(10/s) → IdempotentConsumer → JSON Schema validation
  → Choice(mode) → Multicast(parallel)
      ├─ RabbitMQ RPC  → stamp.rabbit
      ├─ AMQP 1.0 RPC  → stamp.amqp
      ├─ gRPC RPC      → stamp.grpc
      └─ IBM MQ RPC    → stamp.wmq
  → BeginTransaction → SQL INSERT + SELECT → CommitTransaction
  → WireTap fan-out → Kafka + File + Redis + MQTT + SEDA
  → JSON response

Это ровно то, что в проде «склеивает» Camel у джавистов — только здесь это runnable demo на 1500 строк C#, который запускается одной командой против docker compose с RabbitMQ/Postgres/Kafka/Redis/MQTT.

Самая показательная секция для тех, кто думает «а как Route дружит с redb.Core» — NamedRedbRoutes.cs. Из маршрута поднимается named IRedbService (можно держать несколько подключений к разным БД одновременно — pg-testmssql-test), CRUD идёт напрямую через ProcessWithRedb("instance-name", async (redb, ex, ct) => ...):

internal sealed class NamedRedbRoutes : RouteBuilder
{
    protected override void Configure()
    {
        From("timer://named-redb-check?period=30000&delay=5000")
            .RouteId("demo-named-redb-pgsql")
            .Log("[NAMED-REDB] checking pg-test instance...")

            // Метаданные подключения — версия БД, имя cache-domain
            .ProcessWithRedb("pg-test", (redb, ex) =>
            {
                ex.In.Headers["pg-db-type"]      = redb.dbType    ?? "n/a";
                ex.In.Headers["pg-db-version"]   = redb.dbVersion ?? "n/a";
                ex.In.Headers["pg-cache-domain"] = redb.CacheDomain ?? "n/a";
            })
            .Log("[NAMED-REDB] pg-test: ${header.pg-db-type} ${header.pg-db-version}")

            // SAVE — RedbObject<TProps> с автогенерацией схемы по [RedbScheme]
            .ProcessWithRedb("pg-test", async (redb, ex, ct) =>
            {
                var item = new RedbObject<DemoItemProps>
                {
                    name  = $"Demo-{DateTime.UtcNow:HHmmss}",
                    Props = new DemoItemProps
                    {
                        Title       = "Named Redb Demo",
                        Description = $"Created at {DateTime.UtcNow:O}",
                        Priority    = Random.Shared.Next(1, 10)
                    }
                };
                var id = await redb.SaveAsync(item);
                ex.In.Headers["saved-id"] = id.ToString();
            })

            // LOAD by id
            .ProcessWithRedb("pg-test", async (redb, ex, ct) =>
            {
                var id = long.Parse(ex.In.Headers["saved-id"]!.ToString()!);
                var obj = await redb.LoadAsync<DemoItemProps>(id);
                ex.In.Headers["loaded"] = obj is null
                    ? "NOT FOUND"
                    : $"{obj.name} (priority={obj.Props?.Priority})";
            })

            // QUERY с LINQ-Where прямо по props
            .ProcessWithRedb("pg-test", async (redb, ex, ct) =>
            {
                var items = await redb.Query<DemoItemProps>()
                    .Where(p => p.Priority > 3)
                    .Take(5)
                    .ToListAsync();
                ex.In.Headers["query-count"] = items.Count.ToString();
            })

            // DELETE
            .ProcessWithRedb("pg-test", async (redb, ex, ct) =>
            {
                var id = long.Parse(ex.In.Headers["saved-id"]!.ToString()!);
                await redb.DeleteAsync(id);
            })
            .Log("[NAMED-REDB] PG CRUD cycle complete");

        // Тот же шаблон, но на MSSql-инстансе — provider абстрагирован
        From("timer://named-redb-mssql?period=30000&delay=8000")
            .RouteId("demo-named-redb-mssql")
            .ProcessWithRedb("mssql-test", async (redb, ex, ct) => { /* save / load / query / delete */ });
    }
}

Что тут видно:

  • ProcessWithRedb("name", ...) — это extension в пакете redb.Route.RedbCore. По имени достаёт нужный IRedbService из реестра (Redb секция в конфиге проекта), скоупит его на время обработки exchange'а и отдаёт в лямбду. Никакого IServiceProvider.GetRequiredService<IRedbService>() в коде маршрута.

  • SaveAsync / LoadAsync / Query<T>().Where(...).ToListAsync() / DeleteAsync — это стандартный IRedbService API, тот же что и в обычном ASP.NET-сервисе. Маршрут не делает ничего особенного — он просто получает уже инициализированный сервис.

  • Несколько именованных инстансов одновременно — pg-test и mssql-test живут параллельно, можно из одного маршрута писать в Postgres и одновременно зеркалить в MSSql.

  • Схема ([RedbScheme]-классы DemoItemProps) синхронизируется один раз при старте модуля (InitRoute.cs дёргает redb.InitializeAsync(ensureCreated: true)), дальше маршрут просто работает с типизированными RedbObject<TProps>.

То есть в проде из первого блока (tsum — SQL polling из SAP → redb.Query<TransportationRoute>() → апдейт) лежит ровно та же связка, что и здесь, в Demo — только обёрнутая в реальную бизнес-логику. Хотите потрогать без чужого прода — git clone, поднимите docker composedotnet run --project redb.Route.Demo, и оно ответит на POST http://localhost:5088/api/demo JSON-ом со всеми штампами от всех брокеров.


«Я люблю контроллеры из ASP.NET» — пожалуйста, redb.Route.Controllers

Отдельная категория комментариев, которая под такими статьями стабильно появляется: «зачем мне ваш fluent-DSL, я хочу [HttpGet("{id}")] и [FromBody] Order order, как привык в ASP.NET, и чтобы IDE подчёркивала несуществующие роуты». Для них есть пакет redb.Route.Controllers — контроллеры с теми же атрибутами, что и в ASP.NET, но transport-agnostic: один и тот же класс работает за HTTP, SignalR, gRPC или любым другим InOut-эндпоинтом.

[Route("orders")]
public class OrdersController : RedbController
{
    [HttpGet("{id}")]
    public Task<Order> GetOrder(int id, [FromHeader("X-Tenant")] string tenant)
    {
        var redb = Context.GetRedbService();              // тот же IRedbService
        return redb.Query<OrderProps>()
            .Where(o => o.Id == id && o.Tenant == tenant)
            .FirstAsync();
    }

    [HttpPost]
    public async Task<long> CreateOrder([FromBody] Order order, CancellationToken ct)
    {
        var redb = Context.GetRedbService();
        return await redb.SaveAsync(new RedbObject<OrderProps> { Props = order.ToProps() });
    }

    [HttpDelete("{id}")]
    public Task DeleteOrder([FromRoute("id")] int id) =>
        Context.GetRedbService().DeleteAsync(id);
}

Привязка маршрута к транспорту — одна строчка:

// HTTP — читает redbHttp.Method / redbHttp.Path из транспорта
From("http://0.0.0.0:8080/api")
    .RedbHttpController<OrdersController>();

// SignalR — тот же класс, dispatch по redbSignalR.Method
From("signalr://bridge")
    .RedbSignalRController<OrdersController>();

// gRPC — dispatch по dispatch-method из metadata
From("grpc://0.0.0.0:5000")
    .RedbGrpcController<OrdersController>();

// Или сразу пачкой по сборке — как MapControllers() в ASP.NET
var registry = new ControllerRegistry();
registry.RegisterAssembly(typeof(OrdersController).Assembly);
From("http://0.0.0.0:8080/api").RedbHttpController(registry);

Что тут важно понимать:

  • [Route][HttpGet/Post/Put/Delete/Patch][FromBody][FromHeader][FromQuery][FromRoute] — те же самые атрибуты, что вы знаете из ASP.NET. Кривая обучения нулевая.

  • Контроллер наследуется от RedbController и получает Context (тот самый IRouteContext) и Exchange напрямую — никакого IServiceProvider-инъекции в конструктор не нужно. Хотите DI — берёте сервис через Context.GetService<T>().

  • Один контроллер — три транспорта. Тот же OrdersController отвечает на GET /orders/42 через HTTP, на Send("GetOrder", 42) через SignalR-хаб и на gRPC-вызов с dispatch-method=GetOrder. Это та самая «adapter on top of transport»-идея, ради которой обычно пишут Mediator-обвязки руками.

  • Это не отдельный pipeline-инструмент, это IProcessor внутри маршрута. До контроллера можно влепить IdempotentConsumerThrottleOnException<T>WireTap в audit — а только потом отдать в controller dispatcher. Никакого «контроллер vs route» — это контроллер ВНУТРИ route:

From("http://0.0.0.0:8080/api")
    .Throttle(500).Per(TimeSpan.FromSeconds(1))
    .IdempotentConsumer(Header("Idempotency-Key"), repository: "redb")
    .WireTap("seda://audit")
    .RedbHttpController<OrdersController>();

Под капотом — ControllerRegistry сканирует сборку, собирает route table из атрибутов, на каждый запрос dispatcher читает headers, матчит template, биндит параметры ({id} → [FromRoute], query → [FromQuery], тело → [FromBody], остальное по конвенции имени). Ответ возвращается в exchange.In.Body, статус — в status.code (200 для значения, 204 для Task, 404 для no-match, 500 для исключения). Полная таблица атрибутов и conventions — в redb.Route.Controllers/README.md.

То есть это компромисс ровно для того лагеря, который и в 2026-м не готов отказаться от [HttpGet("{id}")]: внешне — привычный ASP.NET-style controller, внутри — обычный Route-pipeline со всеми EIP-плюшками и без привязки к Kestrel. Если ваш «HTTP API» завтра должен заодно слушать SignalR и gRPC — переписывать ничего не надо, добавляются ещё два From(...).Redb*Controller<OrdersController>().


redb.Tsak — где это хостить в проде

redb.Route описывает что делает маршрут. Когда у вас один проект — он живёт внутри dotnet run. Когда маршрутов 30 и они принадлежат разным командам — нужен runtime-контейнер.

tsak
tsak

redb.Tsak — это runtime container для модулей redb.Route. Каждый модуль (.dll или .tpkg-бандл) загружается в изолированный AssemblyLoadContext, имеет свой IRouteContext, свой DI, свои конфиги. Tsak управляет lifecycle'ом независимо: добавил модуль — он стартанул, убрал — выгрузился, обновил — hot-reload без рестарта процесса.

Что есть из коробки:

  • REST API — 32 endpoint'а: управление контекстами, маршрутами, модулями, кластером, scheduler'ом, логами, пользователями

  • CLI — 30 команд: tsak module uploadtsak context starttsak route stoptsak context list — удобно для CI/CD

  • Blazor Server dashboard — 10 страниц: CPU/RAM/GC, per-route latency, ring-buffer логи, статус watchdog

    endpoints
    endpoints
    monitoring
    monitoring
  • Hot-reload — кладёшь обновлённую .dll в Libs/, Tsak подхватывает, старая версия выгружается с graceful drain

  • Кластер — leader election + автоперераспределение контекстов между нодами, через redb-базу, без Redis / ZooKeeper / Consul

  • Quartz scheduler — RAMJobStore для standalone, AdoJobStore для кластера (схема создаётся автоматически)

  • OpenTelemetry — Activities + Meters на каждый маршрут и шаг, Prometheus scrape

Деплой нового маршрута:

# Вариант 1 — голая DLL. Бросаем сборку в Libs/, Tsak увидит, поднимет
# модуль с graceful drain старой версии. Подходит для дев-цикла.
cp Orders.dll /tsak/worker/Libs/

# Вариант 2 — .tpkg-бандл (ZIP с manifest.json + DLL + <Module>.config.json).
# Собирается прямо из .csproj — у redb.Route.Demo есть Target "PackTpkg"
# (AfterTargets="Build"), который кладёт .tpkg рядом и копирует в Tsak Libs.
# Так едет в прод: один файл, версионируемый, с конфигом и манифестом внутри.
cp Orders.tpkg /tsak/worker/Libs/
# или через CLI поверх REST API
tsak module upload orders --file Orders.tpkg
tsak context start orders

# Остановить один маршрут без рестарта процесса
tsak route stop orders order-pipeline

# Что сейчас работает
tsak context list
tsak route list orders

Папка Libs/ — это стандартная convention-точка модулей. В исходниках она лежит ровно по таким же путям: redb.Tsak/src/redb.Tsak.Worker/Libs/ для запуска из репозитория, worker/Libs/ в готовых релизных бинарниках. В Docker-образе — /app/worker/Libs/, монтируется как volume; добавили .dll или .tpkg снаружи — внутри контейнера Tsak поднял модуль без рестарта.

Минимальный пример MSBuild-таргета для упаковки .tpkg прямо из проекта модуля — манифест + DLL + конфиг в один ZIP, и сразу копия в Libs/ Tsak'а (как сделано в redb.Route.Demo.csproj):

<PropertyGroup>
  <TsakModuleName>Orders</TsakModuleName>
  <TsakLibsDir>$(MSBuildThisFileDirectory)..\..\tsak\worker\Libs</TsakLibsDir>
</PropertyGroup>

<Target Name="PackTpkg" AfterTargets="Build">
  <PropertyGroup>
    <_Staging>$(IntermediateOutputPath)tpkg</_Staging>
    <_Tpkg>$(MSBuildThisFileDirectory)output\$(TsakModuleName).tpkg</_Tpkg>
  </PropertyGroup>
  <RemoveDir Directories="$(_Staging)" />
  <MakeDir   Directories="$(_Staging)" />
  <Copy SourceFiles="$(MSBuildThisFileDirectory)manifest.json"            DestinationFolder="$(_Staging)" />
  <Copy SourceFiles="$(TargetPath)"                                        DestinationFolder="$(_Staging)" />
  <Copy SourceFiles="$(MSBuildThisFileDirectory)$(TsakModuleName).config.json"
        DestinationFolder="$(_Staging)"
        Condition="Exists('$(MSBuildThisFileDirectory)$(TsakModuleName).config.json')" />
  <ZipDirectory SourceDirectory="$(_Staging)" DestinationFile="$(_Tpkg)" Overwrite="true" />
  <Copy SourceFiles="$(_Tpkg)" DestinationFolder="$(TsakLibsDir)" />
  <Touch Files="$(TsakLibsDir)\$(TsakModuleName).tpkg" />
</Target>

dotnet build → Orders.tpkg в Libs/ → Tsak его поднял. В CI/CD получается одна артефакт-сущность с проставленной версией, конфигом и манифестом — деплой сводится к публикации файла.

Маршруты с cluster=true идут через redb-координатор. В кластерном режиме маршрут можно пометить как кластерный — тогда Tsak сам распределяет его экземпляры между нодами и следит, что только нужное количество копий запущено. Состояние, partitioning, балансировка — через redb objects/values. Quartz при этом использует свои AdoJobStore-таблицы в той же БД, но это его внутренние таблицы.

Добавить ноду в кластер — запустить ещё один экземпляр Tsak с той же строкой подключения к БД. Никакого ZooKeeper / Consul / Redis как внешней зависимости.

Тот же самый RouteBuilder, который вы написали для обычного IHostedService, работает в Tsak без изменений — тот же Configure(), тот же IExchange, те же OnException и .Transacted().

351 проходящий тест, Apache 2.0.

Про Tsak — отдельная большая статья. Здесь это упомянуто крупными мазками, потому что Tsak — это самостоятельный продукт со своей архитектурой: AssemblyLoadContext-изоляция, 5-слойный конфиг-pipeline, leader election на redb-объектах без внешнего координатора, Blazor-дашборд, REST API, CLI, hot-reload c graceful drain. В одну Route-статью это не влезает. Если зайдёт — напишу отдельно.


Где Route живёт в экосистеме redb

redb.Route — это не одиночный пакет, а слой в стопке продуктов, которые мы пишем последние годы и которые собираются друг из друга. По уровням снизу вверх:

  • redb.Core — ядро: типизированная объектная модель (RedbObject<TProps>), code-first схемы ([RedbScheme]), expression-LINQ-провайдер, кэши, сериализация, валидация, security. Всё абстрактно, без привязки к СУБД.

  • redb.Core.Pro — платная надстройка над Core: change tracking, materialization, миграции схем, расширенные query/scheme-провайдеры.

  • redb.Postgres / redb.MSSql — Free-провайдеры: реализация IRedbContext и фабричных методов RedbServiceBase поверх Npgsql / Microsoft.Data.SqlClient.

  • redb.Postgres.Pro / redb.MSSql.Pro — Pro-провайдеры: подтягивают Core.Pro-возможности (materialization, миграции) на конкретную СУБД.

  • redb.Identity — самостоятельный identity-сервер, построенный поверх redb-объектов: пользователи, роли, JWT, refresh, LDAP-федерация. Используется и в Route-маршрутах (тот же Http.Listen().UseJwt(...)), и снаружи.

  • redb.Export — экспорт/бэкап redb-данных в файлы. Используется в TsUM-проде как cron-маршрут (Cron.Every(...).Process(BackupRedbBase)).

  • redb.Route — то, про что эта статья. Берёт IRedbService из провайдера, добавляет EIP-DSL, транспорты, OpenTelemetry. Маршруты — обычные C#-классы, schema-aware: ProcessWithRedb("pg-test", ctx => ctx.SaveAsync(obj)).

  • redb.Tsak — runtime-контейнер для Route-маршрутов: hot-reload, кластер, дашборд. Поверх Core + одного из провайдеров + Route + Identity (для логина в Blazor-UI).

  • redb.Doc.Web — сайт документации, написан на Blazor поверх того же стека: контент-объекты лежат как RedbObject<DocPageProps> в Postgres, навигация — через Tree-провайдер Core, поиск — через query-провайдер. То есть документация про redb работает на самом redb.

Зависимости честно однонаправленные: Route не знает про Tsak, Tsak не знает про Doc.Web, Core ничего не знает про провайдеров. Можно взять только Core + Postgres и писать своё приложение без Route и Tsak; можно взять Core + Postgres + Route и хостить маршруты в обычном IHostedService; можно собрать всю стопку целиком и получить Tsak + Identity + дашборд + сайт.

Эта статья — про один слой (Route + Tsak). Про Core, Identity и Doc.Web будут отдельные.


Что redb.Route НЕ делает

Честно — потому что иначе через две недели придут с issue.

Durable saga с DB-state machine. Это к MassTransit / NServiceBus / Wolverine. У redb.Route есть Saga() с компенсирующими шагами: forward + reverse, если что-то упало посередине — откатываемся в обратном порядке. Но самой state-machine (где «заказ в состоянии PaymentPending, ждём webhook от Stripe три дня, не теряем при рестарте процесса») — нет. Можно собрать руками через IdempotentConsumer + persistent backend в redb, но это не managed решение.

Managed transactional outbox container. В MassTransit/Wolverine есть отдельный фреймворк, который создаёт таблицу outbox_messages, пишет в неё в той же транзакции, что бизнес-данные, а в фоне публикует. У Route это собирается из примитивов: Sql.Poll(...) + IdempotentConsumer + .Transacted(). То есть outbox-паттерн работает, но автомат-инсталлятор таблицы и фоновый daemon — не предоставлены, пишутся как часть маршрута.

300+ компонентов, как у Camel. 22 транспорта закрывают большую часть реальных задач (Kafka, RabbitMQ, IBM MQ, MQTT, HTTP, gRPC, SQL, SFTP, S3, LDAP, Mail, Quartz), но если вам нужен, скажем, Salesforce-bulk-API-консьюмер или ServiceNow-REST-клиент — это всё ещё «напиши свой компонент через IConsumer/IProducer», или использовать generic HTTP/gRPC поверх их API.

XML-DSL и автодеплой как у Camel K. Только C#-fluent. Никаких маршрутов из YAML. На наш взгляд — плюс (intellisense, рефакторинг, типы), но кому-то декларативный YAML удобнее.


Free vs Pro

Никакой Pro-версии redb.Route нет. Apache 2.0 целиком: все 22 транспорта, все EIP-паттерны, expression engine, Transacted(), OpenTelemetry, retry/DLC, Tsak runtime container, cluster, hot-reload, dashboard.

(Pro-версия есть только у redb.Core — там оптимизации хранилища: compiled queries, parallel materialization, change tracking. Route про это вообще не знает — он работает поверх любого redb-провайдера, любой версии.)


Что в 3.0.0 свежего

Параллельно со статьёй «Free/Pro query parity» про redb.Core 3.0.0 у Route тоже вышла мажорка. Главное:

  • Один canonical compiler. Раньше внутри жил параллельный v2-стек (OldRouteCompilerIRouteDefinition2BlockStack). Удалили. Теперь AST маршрута — это дерево IProcessorDefinition нод, каждая компилирует себя через CreateProcessor(IRouteContext). Никаких bridge-классов, никакого «v2 DSL → bridge → legacy compiler» — та же модель, что у Camel внутри.

  • Dynamic endpoints (ToD, dynamic WireTap, dynamic Enrich). URI получателя вычисляется на каждое сообщение через string template, IExpression или Func<IExchange, string>. Это Camel'овский toD(...) один-в-один.

  • String-template DSL для SetBody / SetHeader / SetProperty / Log. ${header.x}${body.OrderId} — компилируются один раз при build, не интерпретируются.

  • OnException parity с Camel. Добавлены LogStackTrace(bool)LogExhausted(bool), чтобы fluent-набор совпадал с Camel'овским onException(...).

  • Reference-тесты на Choice / TryCatch / Filter (~1200 строк), которые пинят семантику и ловят регрессии.

Полный CHANGELOG: redb.Route/CHANGELOG.md.


Сравнение в одной таблице

Apache Camel

MassTransit

NServiceBus

Wolverine

redb.Route

Платформа

JVM

.NET

.NET

.NET

.NET 8/9/10

Лицензия

Apache 2.0

Коммерческая (v9, Massient)

Коммерческая

MIT

Apache 2.0

Фокус

EIP routing

Message bus + Saga

Message bus + Saga

Mediator + Saga

ESB / EIP routing

Транспорты

300+

5

7

4

22 + 5 встроенных

EIP-паттерны

80+

Saga, R/R, Outbox

Saga, R/R

Saga, R/R, Outbox

30+

Expression engine

Simple Language (интерпретируемый)

Compiled (System.Linq.Expressions)

Конфиг

XML или Java DSL

C# fluent

C# fluent

C# fluent

C# fluent only

Durable saga

Да

Да (DB-backed)

Да (DB-backed)

Да (Marten / EF Core)

Только компенсирующие шаги + persistent IdempotentConsumer

Managed outbox

Plugin

Да

Да

Да

Собирается из Sql.Poll(...).Transacted() + IdempotentConsumer

Publisher confirms / Kafka EOS

Да

Да

Да

Да

Да

Runtime container

Camel K / JBoss Fuse

redb.Tsak (hot-reload, cluster)

Кратко: у Camel — больше всего, но он JVM. У трёх .NET-альтернатив — отличная durable saga + managed outbox, но не ESB, и при этом две из трёх уже коммерческие (MassTransit v9 переехал в Massient, Inc., NServiceBus был платным изначально). Свободным остался только Wolverine на MIT — но это mediator+saga, без 20+ транспортов и без полного EIP-каталога. У redb.Route — все 30+ ходовых EIP, 22 транспорта, compiled expressions, transactional pipelines, и Tsak как runtime-контейнер с кластером. Apache 2.0 целиком. Если вам нужно «Apache Camel в .NET, и желательно без счёта на лицензию» — это redb.Route.


Ссылки

GitHub org (все репозитории) Репозиторий redb.Route Репозиторий redb.Tsak Готовые бинарники redb.Tsak (releases) Docker-образ redb-tsak-stack (GHCR) README.md redb.Route с полной документацией redb.Route.Demo — 39 маршрутов, 18 транспортов, runnable reference CHANGELOG 3.0.0 Документация и примеры (EN) NuGet — все 43 пакета redb.* (~20 800 загрузок) — Route, Core, провайдеры, 22 транспорта, Tsak Архитектура redb.Core — на чём всё это построено Предыдущая статья — про redb.Core / Free vs Pro

Если дочитали — спасибо. Комментарии, баги, EXPLAIN-планы — всё в GitHub Discussions.

Комментарии (0)