В этой статье будет рассказано как с помощью Masstransit и Quartz настроить шедулер для отправки сообщений в шину. Шина это абстракция слабой связи между тем кто посылает сообщения и тем кто подписывается на них через шину. Посылатель и приниматель сообщений знают только тип сообщения(это интерфейс), но не знают ничего друг о друге.

Само управление Quartz в Masstransit происходит через очередь, выделяем для этого очередь:

Startup.cs

cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue);

Посмотреть весь код

В точке обращения к шедулеру(у меня в примере это метод контроллера) получаем объект ISendEndpoint для заданной очереди:

ISendEndpoint _sendEndpoint =
 await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}"));

И отправляем сообщение с настройками для периодичного шедулинга сообщений в некую очередь:


 _sheduler.ScheduledRecurringMessage =

await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>(
new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"),
new PollExternalSystemSchedule(),
new
{
 Message = $"1 sec Quartz {User.Identity.Name}"
});

В PollExternalSystemSchedule настройки периодичности

    public class PollExternalSystemSchedule : DefaultRecurringSchedule
    {
        public PollExternalSystemSchedule()
        {
            StartTime = DateTime.UtcNow;
            CronExpression = "* * * * * ? *";//1 sec
        }
    }

Строку для задания периодичности можно сгенерить тут.

К шине в которую шедулер будет отправлять сообщения можно уже привязывать консьюмеры(те кто получит сообщение):

cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e =>
{
 e.ConfigureConsumer<ShedulerCommandConsumer>(provider); 
});

Консьюмер привязывается по адресу(адрес очереди выше) и интерфейсу:

    public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand>
    {
        private ILogger<ShedulerCommandConsumer> _logger;
        public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger)
        {
            _logger = loger;
        }

        public Task Consume(ConsumeContext<IRepeatingCommand> context)
        {
            _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} ");

            return Task.CompletedTask;
        }
    }

Для остановки шедулера вызываем:

await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage);

Исходный код

Могу еще рассказать про работу конечного автомата Saga(в реальном проекте сообщения так же могут идти через него).

Комментарии (0)