В этой статье будет рассказано как с помощью Masstransit и Quartz настроить шедулер для отправки сообщений в шину. Шина это абстракция слабой связи между тем кто посылает сообщения и тем кто подписывается на них через шину. Посылатель и приниматель сообщений знают только тип сообщения(это интерфейс), но не знают ничего друг о друге.
Само управление Quartz в Masstransit происходит через очередь, выделяем для этого очередь:
Startup.cs
Посмотреть весь код
В точке обращения к шедулеру(у меня в примере это метод контроллера) получаем объект ISendEndpoint для заданной очереди:
И отправляем сообщение с настройками для периодичного шедулинга сообщений в некую очередь:
В PollExternalSystemSchedule настройки периодичности
Строку для задания периодичности можно сгенерить тут.
К шине в которую шедулер будет отправлять сообщения можно уже привязывать консьюмеры(те кто получит сообщение):
Консьюмер привязывается по адресу(адрес очереди выше) и интерфейсу:
Для остановки шедулера вызываем:
Исходный код
Могу еще рассказать про работу конечного автомата Saga(в реальном проекте сообщения так же могут идти через него).
Само управление Quartz в Masstransit происходит через очередь, выделяем для этого очередь:
Startup.cs
cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue);
Посмотреть весь код
В точке обращения к шедулеру(у меня в примере это метод контроллера) получаем объект ISendEndpoint для заданной очереди:
ISendEndpoint _sendEndpoint =
await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}"));
И отправляем сообщение с настройками для периодичного шедулинга сообщений в некую очередь:
_sheduler.ScheduledRecurringMessage =
await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>(
new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"),
new PollExternalSystemSchedule(),
new
{
Message = $"1 sec Quartz {User.Identity.Name}"
});
В PollExternalSystemSchedule настройки периодичности
public class PollExternalSystemSchedule : DefaultRecurringSchedule
{
public PollExternalSystemSchedule()
{
StartTime = DateTime.UtcNow;
CronExpression = "* * * * * ? *";//1 sec
}
}
Строку для задания периодичности можно сгенерить тут.
К шине в которую шедулер будет отправлять сообщения можно уже привязывать консьюмеры(те кто получит сообщение):
cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e =>
{
e.ConfigureConsumer<ShedulerCommandConsumer>(provider);
});
Консьюмер привязывается по адресу(адрес очереди выше) и интерфейсу:
public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand>
{
private ILogger<ShedulerCommandConsumer> _logger;
public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger)
{
_logger = loger;
}
public Task Consume(ConsumeContext<IRepeatingCommand> context)
{
_logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} ");
return Task.CompletedTask;
}
}
Для остановки шедулера вызываем:
await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage);
Исходный код
Могу еще рассказать про работу конечного автомата Saga(в реальном проекте сообщения так же могут идти через него).