В прошлой статье мы сгенерировали Enpoint-ы WebApi по описанию на основе классов, свойств и атрибутов. В этой статье мы добавим генерацию абстракций EventHandler-ов, работающих с шиной, саму шину, реализацию EventHandler-ов для MassTransit, узнаем во сколько раз больше мы генерируем, чем пишем (на этот раз без ошибки).

(первая часть, вторая часть, третья часть, четвертая часть)

Отправка сообщений в шину

Отправка сообщений реализована одним интерфейсом с одним методом (просто отправить данные).

IMessageBus
using System.Threading.Tasks;

namespace Domain.Common.Interfaces.Infrastructure.MessageBus
{
    /// <summary>
    /// Интерфейс, описывающий работу с шиной
    /// </summary>
    public interface IMessageBus
    {
        /// <summary>
        /// Отсылает сообщение в шину
        /// </summary>
        /// <typeparam name="T">Тип сообщения</typeparam>
        /// <param name="message">Сообщение</param>
        /// <param name="busName">Имя шины</param>
        /// <returns>Таск для ожидания</returns>
        Task Send<T>(T message);
    }
}

Чтение сообщений из шины

У нас есть 2 Generic интерфейса для описания чтения сообщений из шины IEventHandler<T> и BatchEventHandler<T>. Первый обрабатывает сообщения по одному, второй — пачками. Так как это Generic интерфейсы, для удобства регистрации в контейнере и резолва они приведены к единому интерфейсу IEventHandlerBase.

Единый не-Generic интерфейс для Generic - хорошая практика.

IEventHandler<T>
namespace Domain.Common.Interfaces.Application.Events
{
    /// <summary>
    /// Выполняет обработку сообщений
    /// </summary>
    /// <typeparam name="T">Тип сообщения</typeparam>
    public interface IEventHandler<T> : IEventHandlerBase
    {
        /// <summary>
        /// Выполняет обработку сообщения
        /// </summary>
        /// <param name="message">обьект сообщения</param>
        void Handle(T message);
    }
}
IBatсhEventHandler<T>
using System.Collections.Generic;

namespace Domain.Common.Interfaces.Application.Events
{
    /// <summary>
    /// Выполняет обработку сообщений
    /// </summary>
    /// <typeparam name="T">Тип сообщения</typeparam>
    public interface IBatсhEventHandler<T> : IEventHandlerBase
    {
        /// <summary>
        /// Выполняет обработку сообщения
        /// </summary>
        /// <param name="messages">обьекты сообщения</param>
        void Handle(List<T> messages);
        /// <summary>
        /// Размер батча
        /// </summary>
        int BatchSize { get; set; }
    }
}
IEventHandlerBase
using System;

namespace Domain.Common.Interfaces.Application.Events
{
    /// <summary>
    /// Базовый интерфейс для обработки сообщений
    /// </summary>
    public interface IEventHandlerBase
    {
        /// <summary>
        /// Тип события обработчика событий
        /// </summary>
        Type EventType { get; }
    }
}

Какой ужас, но ведь это синхронные интерфейсы! Ведь везде async/await и TPL!

Да, зачем мне городить Task/async, если этот метод все равно вызывается в отдельном таске, который ничего не делает? Незачем ).

Генерация нашего BatchEventHandler в Application.Workers (Application)

Тут мы генерируем наше описание обработчика пакета событий. Обработчик простой - Берем источник данных и делаем BatchInsert.

Генератор реализации IBatchEventHandler
using CodeGen.GeneratorBase;
using CodeGen.GeneratorBase.Context;
using CodeGen.Generators.WebApiWithBulkInsert;
using CodeGeneration.GeneratorBase;
using System.Collections.Generic;

namespace CodeGen.Generators.RequestEntity.Application.Workers
{
    class RequestEntityBatchEventHandlerGenerator : CodeGeneratorBase<RequestEntityGeneratorDTO>
    {
        private ICodeGeneratorScanner<RequestEntityGeneratorDTO> scanner;
        public RequestEntityBatchEventHandlerGenerator() 
        {
            place = GeneratorRunPlace.ApplicationWorkers;
            scanner = new RequestEntityScanner();
        }

        public override void Generate(GenerationContext context, List<RequestEntityGeneratorDTO> data)
        {
            foreach (var entity in data)
            {
                context.Project.AddSource($"Generated_WebApiWithBulkInsert_BusWorkers_{entity.requestEntityType.Name}", GenerateBusWorker(entity));
            }
        }

        private string GenerateBusWorker(RequestEntityGeneratorDTO entity)
        {
            var res = $@"
using Domain.Common.Interfaces.Application.Events;
using Domain.Common.Interfaces.Infrastructure.DAL;
using Microsoft.Extensions.Logging;
using {entity.requestEntityType.Namespace}

namespace Application.Workers
{{
    internal class BatchEventHandler{entity.requestEntityType.Name} : IBatсhEventHandler<{entity.requestEntityType.Name}>
    {{
        protected IDataSource<{entity.requestEntityType.Name}> _dataSource;
        protected ILogger _logger;
        public BatchEventHandler{entity.requestEntityType.Name}(IDataSource<{entity.requestEntityType.Name}> dataSource, ILogger logger) 
        {{ 
            _dataSource = dataSource;
            _logger = logger;
        }}
        
        public int BatchSize {{ get; set; }}

        public void Handle(List<{entity.requestEntityType.Name}> messages)
        {{
            try
            {{
                _dataSource.BatchInsert(messages);
            }}
            catch(Exception ex)
            {{
                _logger.LogError(ex, null);
                throw;
            }}
        }}
    }}
}}


";
            return res;
            
        }

        public override List<RequestEntityGeneratorDTO> Parse(GenerationContext context)
        {
            return scanner.Scan(context);
        }
    }
}

Генерация реализации IBatchEventHandler для MassTransit (Infrastructure)

Тут все просто:

  1. Генерируем класс, реализующий IConsumer из MassTransit

  2. резолвим наш IBatchEventHandler в конструкторе

  3. Вызываем наш Handle в Consume, в таске

Генератор реализации IConsumer для MassTransit
using CodeGen.GeneratorBase;
using CodeGen.GeneratorBase.Context;
using CodeGeneration.GeneratorBase;
using System.Collections.Generic;

namespace CodeGen.Generators.WebApiWithBulkInsert.Infrastructure.MessageBus
{
    class RequestEntityInfrastructureWorkerGenerator : CodeGeneratorBase<RequestEntityGeneratorDTO>
    {
        private ICodeGeneratorScanner<RequestEntityGeneratorDTO> scanner;
        public RequestEntityInfrastructureWorkerGenerator() 
        {
            place = GeneratorRunPlace.InfrastructureMessageBus;
            scanner = new RequestEntityScanner();
        }

        public override void Generate(GenerationContext context, List<RequestEntityGeneratorDTO> data)
        {
            foreach (var entity in data)
            {
                context.Project.AddSource($"Generated_WebApiWithBulkInsert_BatchEventHandler_{entity.requestEntityType.Name}", GenerateBusWorker(entity));
            }
        }

        private string GenerateBusWorker(RequestEntityGeneratorDTO entity)
        {
            var res = $@"
using Domain.Common.Interfaces.Application.Events;
using MassTransit;
using System;
using System.Collections.Generic;
using {entity.requestEntityType.Namespace};
using System.Threading.Tasks;
using System.Linq;
using MassTransit;
using System.Text;

namespace Infrastructure.Workers
{{
    internal class Generated_WebApiWithBulkInsert_BatchEventHandler_{entity.requestEntityType.Name} : IConsumer<Batch<{entity.requestEntityType.Name}>>

    {{
        protected IBatсhEventHandler<{entity.requestEntityType.Name}> _eventHandler;
        public Generated_WebApiWithBulkInsert_BatchEventHandler_{entity.requestEntityType.Name}(IBatсhEventHandler<{entity.requestEntityType.Name}> eventHandler) 
        {{ 
            _eventHandler = eventHandler;
            
        }}
        
        public int BatchSize {{ get; set; }}

        public Task Consume(ConsumeContext<Batch<{entity.requestEntityType.Name}>> context)
        {{
            var items = context.Message.Select(i => i.Message).ToList();

            Action<object> action = (object taskParams) =>
            {{
                //Получаем исходный обьект
                taskParamsMessage{entity.requestEntityType.Name} services = (taskParamsMessage{entity.requestEntityType.Name})taskParams;

                //Вызываем Handle в нашей абстракции
                services.handler.Handle(services.messages);               
            }};

            var state = new taskParamsMessage{entity.requestEntityType.Name}()
            {{
                handler = _eventHandler,
                messages = items
            }};

            var task = new Task(action, state);

            task.Start();

            return task;
        }}
    }}

    internal class taskParamsMessage{entity.requestEntityType.Name}
    {{
        public List<{entity.requestEntityType.Name}> messages {{ get; set; }}

        public IBatсhEventHandler<{entity.requestEntityType.Name}> handler {{ get; set; }}
    }}
}}
";
            return res;

        }

        public override List<RequestEntityGeneratorDTO> Parse(GenerationContext projectContext)
        {
            return scanner.Scan(projectContext);
        }
    }
}

Теперь зарегистрируем все это в контейнере

Регистрация простая, я не стал делать отдельный вспомогательный класс (мне лень).

Просто получаем все неабстрактные классы с интерфейсом и регистрируем.

Регистрируем BatchEventhandler-ы из Application.Workers
using Domain.Common.Interfaces.Application.Events;
using Domain.Common.Interfaces.Application.Workers;
        public static IServiceCollection RegisterBusHandlers(this IServiceCollection collection)
        {
            collection.RegisterHandlers();
            var provider = collection.BuildServiceProvider();

            //var handlers = provider.GetServices<IEventHandlerBase>();

            //collection.AddMassTransit(x =>
            //{
            //    foreach (var handler in handlers)
            //    {
            //        x.AddConsumer(handler.GetType());
            //    }
            //});

            return collection;
        }


        public static IServiceCollection RegisterHandlers(this IServiceCollection collection)
        {
            var sp_local = collection.BuildServiceProvider();

            var types = System.Reflection.Assembly.GetExecutingAssembly().GetTypes()
                .Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i == typeof(IEventHandlerBase)));

            foreach (var type in types)
            {
                var service = (IEventHandlerBase)sp_local.GetService(type);
                collection.AddTransient(type);
                collection.AddTransient<IEventHandlerBase>(sp => (IEventHandlerBase)sp.GetService(type));

                var interfaceDef = typeof(IEventHandler<>);
                var implGenInterface = interfaceDef.MakeGenericType(service.EventType);

                collection.AddTransient(implGenInterface, type);
            }

            return collection;
        }

Остается только вызвать методы регистрации в наших приложениях (cmd сервер и asp.net mvc), и все.

Теперь все наши сгенерированные классы будут работать.

Шина

Конфигурация шины храниться естественно в Infrastructure.MessageBus.

Так же в Infrastructure.Bus хранится код для регистрации всех реализаций IConsumer.

Конфигурация шины выполняется методом ConfigureMessageBus.

Пока конфигурация шины и обработчиков сообщений из шины еще не доделана.

Основные моменты по генерации

Хотелось бы отметить новые моменты в генерации(которые помогут Вам писать по 2-3 генератора за спринт).

Первое, что хотелось бы отметить, это порядок создания генератора:

1) Сначала напишите класс

2) Затем создайте класс генератора

3) Если классов несколько (а не генерация методов, как для WebApi), сделайте foreach по всем DTO с данными для генерации.

4) Скопируйте написанный класс, замените необходимые места (названия классов, части имен) на данные из DTO для генерации:

internal class Generated_WebApiWithBulkInsert_BatchEventHandler_{entity.requestEntityType.Name} : IConsumer<Batch<{entity.requestEntityType.Name}>>

5) Все, теперь этого boilerplate нет

А за сколько можно написать свой BoilerTemplate?

Если вы будете делать что-то проще (например, BoilerTemplate с WebAPI в 10к RPS):

1) Генерировать все описания для DbContext (еще 1-3 класса)

2) Crud (1 класс сервис 1 класс команда)

3) Объекты для взаимодействия по REST API (в стиле передал id и одно\несколько полей) (1-2 класса)

4) Эндпоинты для CRUD (1 -16 классов)

5) Объекты для описания поиска (1 класс)

6) Поиск (1 класс)

7) Эндпоинты для поиска (1 класс)

Вам понадобится написать еще всего-лишь 10-20 генераторов

(Ну и авторизация).

Это относительно немного.

Заключение

Скачать и посмотреть можно тут.

Всего написано 4 генератора (два в этой статье):

  1. WebAPI (Infrastructure.Web)

  2. DTO для данных WebApi (Application.Common)

  3. IBatchEventHandler (Application.Workers)

  4. Реализация обработчика батчей через MassTransit (Infrastructure.MessageBus)

Коэффициент кодогенерации (сколько кода пишем руками к количеству сгенерированного):

272/24=11,3

По уровню нагрузки — в день я могу добавлять хоть 10 конечных точек.

Генераторы заняли 400 строк.

На описание среднего Endpoint-а ушло порядка 20 строк.

Вся кодогенерация «окупается» за 2 конечные точки. (Дай бог половина АПИ одного IOT устройства).

В следующей статье я наконец-то запущу Kafka у себя и реализую IDataSource (или сделаю заглушку). И измерю свои 10k RPS :).

И обязательно определюсь, с чем я работаю, с Шиной и сообщениями, или с Событями и обработчиками событий.

Комментарии (2)


  1. NeriaLab
    31.07.2025 05:56

    Мне напомнило про данный тип событий - https://github.com/TAGC/AsyncEvent или я ошибаюсь?!


    1. ValeriyPus Автор
      31.07.2025 05:56

      В первой статье все описано