Рассмотрим классическую схему построения конвейера сообщений в RabbitMQ состоящую из элементов Producer, Exchange, Queue и Consumer.



Задача состоит в том, что бы организовать мониторинг происходящего в очереди и не затронуть основное программное обеспечение (ПО), добавить гибкую возможность построения отчетов и при этом избежать дополнительных затрат. Итоговая конструкция должна позволять быстро строить отчеты для анализа потока данных на конвейере без использования основных мощностей серверов (что позволит избежать дополнительной нагрузки) и основного ПО. Подход должен быть легко переносимым на более сложную архитектуру.

Прежде всего организуем демонстрационный стенд, для этого внесем следующие изменения в работу конвейера:



Изначально для Exchange (faust) была задана следующая конфигурация, которая не меняется в рассматриваем примере при модификации:



Важным является настройка типа fanaut — которая позволяет создать две равноправные очереди и дублировать весь поток сообщений в новую очередь Statistics:





без какого бы то ни-было вмешательства в основной процесс в очереди Logs. Приступим к обработке потока сообщений. Прежде всего создаем таблицу на MS SQL сервере для хранения статистической информации. Вы можете использовать любой другой подход, например, сохранить сообщения в файл в xml формате или любой другой способ, в рассматриваемом примере выбран SQL сервер для того чтобы избежать дополнительного программирования

create table RabbitMsg(  
id int PRIMARY KEY IDENTITY(1000,1),  
[Message] nvarchar(1000) DEFAULT '',
RegDate datetime default GETDATE()) 

Как видно из SQL запроса это таблица с номером записи, некоторым текстом и датой вставки записи в таблицу.

Создадим клиента, который будет обращаться к RabbitMQ в очередь Statistics, забирать полученные данные и перекладывать их в таблицу RabbitMsg

using System; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 
using System.Text; 
using System.Data.SqlClient; 
  
namespace Getter 
{ 
    class Program 
    { 
        static void Main(string[] args) 
        { 
            var factory = new ConnectionFactory() { HostName = "192.168.1.241", Port = 30672, UserName = "robotics01", Password = "" }; 
            using (var connection = factory.CreateConnection()) 
            using (var channel = connection.CreateModel()) 
            { 
                channel.ExchangeDeclare(exchange: "faust", type: "fanout", durable: true); 
                 
                var queueName  = "Statistics"; 
                channel.QueueBind(queue: queueName, exchange: "faust", routingKey: ""); 
  
                Console.WriteLine(" [*] Waiting for logs."); 
  
                var consumer = new EventingBasicConsumer(channel); 
                consumer.Received += (model, ea) => 
                { 
                    var body = ea.Body; 
                    var message = Encoding.UTF8.GetString(body); 
                    Console.WriteLine(" [x] {0}", message); 
  
                    SqlConnection sqlconnection = new SqlConnection("Server=tcp:fastreportsql,1433;Initial Catalog=FastReportSQL;Persist Security Info=False;User ID=ufocombat;Password=;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"); 
                    sqlconnection.Open(); 
                    SqlCommand cmd = new SqlCommand($"INSERT INTO RabbitMsg(Message) VALUES (@msg)", sqlconnection); 
                    cmd.Parameters.AddWithValue("msg", message); 
  
                    cmd.ExecuteNonQuery(); 
                    sqlconnection.Close(); 
                }; 
                channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); 
  
                Console.WriteLine(" Press [enter] to exit."); 
                Console.ReadLine(); 
            } 
            Console.WriteLine("Hello World!"); 
        } 
    } 
} 

Посмотрим, как это работает в режиме реального времени


Тем временем на MS SQL Server



Построим отчет по данным очереди Statistics


Вот что получилось:



Заключение


В рассмотренном примере показано как быстро собрать статистику и даже построить отчет, который можно сохранить в PDF и отправить по почте по данным конвейера RabbitMQ и дополнительной очереди. Легко придумать примеры задач, когда информация собирается о каких-либо процессах и строятся отчеты без разработки серверной части. Учитывая, что FastReports предлагает open-source версию, то можно значительно сократить стоимость разработки без дополнительных затрат. Сам конвейер, также легко перенастраивается и может быть приспособлен для более сложных задач.

Комментарии (9)


  1. pallada92
    09.04.2019 17:25
    +3

    Мне в RabbitMQ всегда казалось странным, что продюсеры и консьюмеры должны декларировать exchange, при этом указывая ее тип. То есть если мы внедряем мониторинг и меняем тип на fanout, то мы должны сделать исправления во всех консьюмерах и продюсерах, когда они декларируют эту exchange. Хотя они не должны знать, как она работает внутри. Это не критика вашей статьи, просто вещь, которая мне кажется странной.


    1. Iv38
      10.04.2019 14:37

      Уже созданный эксчендж не будет пересоздан. Таком образом можно поменять тип эксченджа не меняя код. А то что указано в коде будет использоваться по умолчанию.


      1. pallada92
        10.04.2019 14:50

        Насколько помню, в питоне aio-pika ругается, если при объявлении уже существующей exchange тип не совпадает.


    1. WGH
      10.04.2019 15:34

      > Хотя они не должны знать, как она работает внутри.

      Если они не хотят знать, они же могут её использовать без декларации. Exchange может быть создан кем-то другим.


    1. ufocombat Автор
      10.04.2019 18:22

      Ребят спасибо за комментарии, fanout был изначально в очереди хоть она и одна была — такой вот пример))) — статья подразумевает что нужно встроиться в то. что есть и тем более не претендует на то как нужно настраивать очереди


  1. Gigazet
    10.04.2019 18:26

    При определении consumer не нужно декларировать exchange. Достаточно имени exchange при декларации биндинга.


  1. creasol
    10.04.2019 19:58
    +1

    Кстати на тему мониторинга RabbitMQ я настраивал экспортер для него в prometheus а потом нескучные дашборды в grafana. Если этот стек используется, то мне кажется вполне подойдет.

    Вот готовый дашборд:
    grafana.com/dashboards/4279

    А вот с оф. документации по самому экспортеру (можно его рядом положить или как плагин для rabbitmq поставить):
    www.rabbitmq.com/prometheus.html


    1. ufocombat Автор
      10.04.2019 19:59

      Подскажите там код у вас на c# бегло посмотрел или на чем ваш сайт написант, можно на код посмотреть, тема очень актуальная


      1. creasol
        11.04.2019 11:16

        Не совсем понял, о каком коде идет речь? Проект о котором я говорил это маркетплейс skybuy.ru
        И c# там никакого нет :)