
В прошлой статье мы сгенерировали Enpoint-ы WebApi по описанию на основе классов, свойств и атрибутов. В этой статье мы добавим генерацию абстракций EventHandler-ов, работающих с шиной, саму шину, реализацию EventHandler-ов для MassTransit, узнаем во сколько раз больше мы генерируем, чем пишем (на этот раз без ошибки).
(первая часть, вторая часть, третья часть, четвертая часть)
Отправка сообщений в шину
Отправка сообщений реализована одним интерфейсом с одним методом (просто отправить данные).
IMessageBus
using System.Threading.Tasks;
namespace Domain.Common.Interfaces.Infrastructure.MessageBus
{
/// <summary>
/// Интерфейс, описывающий работу с шиной
/// </summary>
public interface IMessageBus
{
/// <summary>
/// Отсылает сообщение в шину
/// </summary>
/// <typeparam name="T">Тип сообщения</typeparam>
/// <param name="message">Сообщение</param>
/// <param name="busName">Имя шины</param>
/// <returns>Таск для ожидания</returns>
Task Send<T>(T message);
}
}
Чтение сообщений из шины
У нас есть 2 Generic интерфейса для описания чтения сообщений из шины IEventHandler<T> и BatchEventHandler<T>. Первый обрабатывает сообщения по одному, второй — пачками. Так как это Generic интерфейсы, для удобства регистрации в контейнере и резолва они приведены к единому интерфейсу IEventHandlerBase.
Единый не-Generic интерфейс для Generic - хорошая практика.
IEventHandler<T>
namespace Domain.Common.Interfaces.Application.Events
{
/// <summary>
/// Выполняет обработку сообщений
/// </summary>
/// <typeparam name="T">Тип сообщения</typeparam>
public interface IEventHandler<T> : IEventHandlerBase
{
/// <summary>
/// Выполняет обработку сообщения
/// </summary>
/// <param name="message">обьект сообщения</param>
void Handle(T message);
}
}
IBatсhEventHandler<T>
using System.Collections.Generic;
namespace Domain.Common.Interfaces.Application.Events
{
/// <summary>
/// Выполняет обработку сообщений
/// </summary>
/// <typeparam name="T">Тип сообщения</typeparam>
public interface IBatсhEventHandler<T> : IEventHandlerBase
{
/// <summary>
/// Выполняет обработку сообщения
/// </summary>
/// <param name="messages">обьекты сообщения</param>
void Handle(List<T> messages);
/// <summary>
/// Размер батча
/// </summary>
int BatchSize { get; set; }
}
}
IEventHandlerBase
using System;
namespace Domain.Common.Interfaces.Application.Events
{
/// <summary>
/// Базовый интерфейс для обработки сообщений
/// </summary>
public interface IEventHandlerBase
{
/// <summary>
/// Тип события обработчика событий
/// </summary>
Type EventType { get; }
}
}
Какой ужас, но ведь это синхронные интерфейсы! Ведь везде async/await и TPL!
Да, зачем мне городить Task/async, если этот метод все равно вызывается в отдельном таске, который ничего не делает? Незачем ).
Генерация нашего BatchEventHandler в Application.Workers (Application)
Тут мы генерируем наше описание обработчика пакета событий. Обработчик простой - Берем источник данных и делаем BatchInsert.
Генератор реализации IBatchEventHandler
using CodeGen.GeneratorBase;
using CodeGen.GeneratorBase.Context;
using CodeGen.Generators.WebApiWithBulkInsert;
using CodeGeneration.GeneratorBase;
using System.Collections.Generic;
namespace CodeGen.Generators.RequestEntity.Application.Workers
{
class RequestEntityBatchEventHandlerGenerator : CodeGeneratorBase<RequestEntityGeneratorDTO>
{
private ICodeGeneratorScanner<RequestEntityGeneratorDTO> scanner;
public RequestEntityBatchEventHandlerGenerator()
{
place = GeneratorRunPlace.ApplicationWorkers;
scanner = new RequestEntityScanner();
}
public override void Generate(GenerationContext context, List<RequestEntityGeneratorDTO> data)
{
foreach (var entity in data)
{
context.Project.AddSource($"Generated_WebApiWithBulkInsert_BusWorkers_{entity.requestEntityType.Name}", GenerateBusWorker(entity));
}
}
private string GenerateBusWorker(RequestEntityGeneratorDTO entity)
{
var res = $@"
using Domain.Common.Interfaces.Application.Events;
using Domain.Common.Interfaces.Infrastructure.DAL;
using Microsoft.Extensions.Logging;
using {entity.requestEntityType.Namespace}
namespace Application.Workers
{{
internal class BatchEventHandler{entity.requestEntityType.Name} : IBatсhEventHandler<{entity.requestEntityType.Name}>
{{
protected IDataSource<{entity.requestEntityType.Name}> _dataSource;
protected ILogger _logger;
public BatchEventHandler{entity.requestEntityType.Name}(IDataSource<{entity.requestEntityType.Name}> dataSource, ILogger logger)
{{
_dataSource = dataSource;
_logger = logger;
}}
public int BatchSize {{ get; set; }}
public void Handle(List<{entity.requestEntityType.Name}> messages)
{{
try
{{
_dataSource.BatchInsert(messages);
}}
catch(Exception ex)
{{
_logger.LogError(ex, null);
throw;
}}
}}
}}
}}
";
return res;
}
public override List<RequestEntityGeneratorDTO> Parse(GenerationContext context)
{
return scanner.Scan(context);
}
}
}
Генерация реализации IBatchEventHandler для MassTransit (Infrastructure)
Тут все просто:
Генерируем класс, реализующий IConsumer из MassTransit
резолвим наш IBatchEventHandler в конструкторе
Вызываем наш Handle в Consume, в таске
Генератор реализации IConsumer для MassTransit
using CodeGen.GeneratorBase;
using CodeGen.GeneratorBase.Context;
using CodeGeneration.GeneratorBase;
using System.Collections.Generic;
namespace CodeGen.Generators.WebApiWithBulkInsert.Infrastructure.MessageBus
{
class RequestEntityInfrastructureWorkerGenerator : CodeGeneratorBase<RequestEntityGeneratorDTO>
{
private ICodeGeneratorScanner<RequestEntityGeneratorDTO> scanner;
public RequestEntityInfrastructureWorkerGenerator()
{
place = GeneratorRunPlace.InfrastructureMessageBus;
scanner = new RequestEntityScanner();
}
public override void Generate(GenerationContext context, List<RequestEntityGeneratorDTO> data)
{
foreach (var entity in data)
{
context.Project.AddSource($"Generated_WebApiWithBulkInsert_BatchEventHandler_{entity.requestEntityType.Name}", GenerateBusWorker(entity));
}
}
private string GenerateBusWorker(RequestEntityGeneratorDTO entity)
{
var res = $@"
using Domain.Common.Interfaces.Application.Events;
using MassTransit;
using System;
using System.Collections.Generic;
using {entity.requestEntityType.Namespace};
using System.Threading.Tasks;
using System.Linq;
using MassTransit;
using System.Text;
namespace Infrastructure.Workers
{{
internal class Generated_WebApiWithBulkInsert_BatchEventHandler_{entity.requestEntityType.Name} : IConsumer<Batch<{entity.requestEntityType.Name}>>
{{
protected IBatсhEventHandler<{entity.requestEntityType.Name}> _eventHandler;
public Generated_WebApiWithBulkInsert_BatchEventHandler_{entity.requestEntityType.Name}(IBatсhEventHandler<{entity.requestEntityType.Name}> eventHandler)
{{
_eventHandler = eventHandler;
}}
public int BatchSize {{ get; set; }}
public Task Consume(ConsumeContext<Batch<{entity.requestEntityType.Name}>> context)
{{
var items = context.Message.Select(i => i.Message).ToList();
Action<object> action = (object taskParams) =>
{{
//Получаем исходный обьект
taskParamsMessage{entity.requestEntityType.Name} services = (taskParamsMessage{entity.requestEntityType.Name})taskParams;
//Вызываем Handle в нашей абстракции
services.handler.Handle(services.messages);
}};
var state = new taskParamsMessage{entity.requestEntityType.Name}()
{{
handler = _eventHandler,
messages = items
}};
var task = new Task(action, state);
task.Start();
return task;
}}
}}
internal class taskParamsMessage{entity.requestEntityType.Name}
{{
public List<{entity.requestEntityType.Name}> messages {{ get; set; }}
public IBatсhEventHandler<{entity.requestEntityType.Name}> handler {{ get; set; }}
}}
}}
";
return res;
}
public override List<RequestEntityGeneratorDTO> Parse(GenerationContext projectContext)
{
return scanner.Scan(projectContext);
}
}
}
Теперь зарегистрируем все это в контейнере
Регистрация простая, я не стал делать отдельный вспомогательный класс (мне лень).
Просто получаем все неабстрактные классы с интерфейсом и регистрируем.
Регистрируем BatchEventhandler-ы из Application.Workers
using Domain.Common.Interfaces.Application.Events;
using Domain.Common.Interfaces.Application.Workers;
public static IServiceCollection RegisterBusHandlers(this IServiceCollection collection)
{
collection.RegisterHandlers();
var provider = collection.BuildServiceProvider();
//var handlers = provider.GetServices<IEventHandlerBase>();
//collection.AddMassTransit(x =>
//{
// foreach (var handler in handlers)
// {
// x.AddConsumer(handler.GetType());
// }
//});
return collection;
}
public static IServiceCollection RegisterHandlers(this IServiceCollection collection)
{
var sp_local = collection.BuildServiceProvider();
var types = System.Reflection.Assembly.GetExecutingAssembly().GetTypes()
.Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i == typeof(IEventHandlerBase)));
foreach (var type in types)
{
var service = (IEventHandlerBase)sp_local.GetService(type);
collection.AddTransient(type);
collection.AddTransient<IEventHandlerBase>(sp => (IEventHandlerBase)sp.GetService(type));
var interfaceDef = typeof(IEventHandler<>);
var implGenInterface = interfaceDef.MakeGenericType(service.EventType);
collection.AddTransient(implGenInterface, type);
}
return collection;
}
Остается только вызвать методы регистрации в наших приложениях (cmd сервер и asp.net mvc), и все.
Теперь все наши сгенерированные классы будут работать.
Шина
Конфигурация шины храниться естественно в Infrastructure.MessageBus.
Так же в Infrastructure.Bus хранится код для регистрации всех реализаций IConsumer.
Конфигурация шины выполняется методом ConfigureMessageBus.
Пока конфигурация шины и обработчиков сообщений из шины еще не доделана.
Основные моменты по генерации
Хотелось бы отметить новые моменты в генерации(которые помогут Вам писать по 2-3 генератора за спринт).
Первое, что хотелось бы отметить, это порядок создания генератора:
1) Сначала напишите класс
2) Затем создайте класс генератора
3) Если классов несколько (а не генерация методов, как для WebApi), сделайте foreach по всем DTO с данными для генерации.
4) Скопируйте написанный класс, замените необходимые места (названия классов, части имен) на данные из DTO для генерации:
internal class Generated_WebApiWithBulkInsert_BatchEventHandler_{entity.requestEntityType.Name} : IConsumer<Batch<{entity.requestEntityType.Name}>>
5) Все, теперь этого boilerplate нет
А за сколько можно написать свой BoilerTemplate?
Если вы будете делать что-то проще (например, BoilerTemplate с WebAPI в 10к RPS):
1) Генерировать все описания для DbContext (еще 1-3 класса)
2) Crud (1 класс сервис 1 класс команда)
3) Объекты для взаимодействия по REST API (в стиле передал id и одно\несколько полей) (1-2 класса)
4) Эндпоинты для CRUD (1 -16 классов)
5) Объекты для описания поиска (1 класс)
6) Поиск (1 класс)
7) Эндпоинты для поиска (1 класс)
Вам понадобится написать еще всего-лишь 10-20 генераторов
(Ну и авторизация).
Это относительно немного.
Заключение
Скачать и посмотреть можно тут.
Всего написано 4 генератора (два в этой статье):
WebAPI (Infrastructure.Web)
DTO для данных WebApi (Application.Common)
IBatchEventHandler (Application.Workers)
Реализация обработчика батчей через MassTransit (Infrastructure.MessageBus)
Коэффициент кодогенерации (сколько кода пишем руками к количеству сгенерированного):
272/24=11,3
По уровню нагрузки — в день я могу добавлять хоть 10 конечных точек.
Генераторы заняли 400 строк.
На описание среднего Endpoint-а ушло порядка 20 строк.
Вся кодогенерация «окупается» за 2 конечные точки. (Дай бог половина АПИ одного IOT устройства).
В следующей статье я наконец-то запущу Kafka у себя и реализую IDataSource (или сделаю заглушку). И измерю свои 10k RPS :).
И обязательно определюсь, с чем я работаю, с Шиной и сообщениями, или с Событями и обработчиками событий.
NeriaLab
Мне напомнило про данный тип событий - https://github.com/TAGC/AsyncEvent или я ошибаюсь?!
ValeriyPus Автор
В первой статье все описано