Рассмотрим классическую схему построения конвейера сообщений в RabbitMQ состоящую из элементов Producer, Exchange, Queue и Consumer.
Задача состоит в том, что бы организовать мониторинг происходящего в очереди и не затронуть основное программное обеспечение (ПО), добавить гибкую возможность построения отчетов и при этом избежать дополнительных затрат. Итоговая конструкция должна позволять быстро строить отчеты для анализа потока данных на конвейере без использования основных мощностей серверов (что позволит избежать дополнительной нагрузки) и основного ПО. Подход должен быть легко переносимым на более сложную архитектуру.
Прежде всего организуем демонстрационный стенд, для этого внесем следующие изменения в работу конвейера:
Изначально для Exchange (faust) была задана следующая конфигурация, которая не меняется в рассматриваем примере при модификации:
Важным является настройка типа fanaut — которая позволяет создать две равноправные очереди и дублировать весь поток сообщений в новую очередь Statistics:
без какого бы то ни-было вмешательства в основной процесс в очереди Logs. Приступим к обработке потока сообщений. Прежде всего создаем таблицу на MS SQL сервере для хранения статистической информации. Вы можете использовать любой другой подход, например, сохранить сообщения в файл в xml формате или любой другой способ, в рассматриваемом примере выбран SQL сервер для того чтобы избежать дополнительного программирования
Как видно из SQL запроса это таблица с номером записи, некоторым текстом и датой вставки записи в таблицу.
Создадим клиента, который будет обращаться к RabbitMQ в очередь Statistics, забирать полученные данные и перекладывать их в таблицу RabbitMsg
Посмотрим, как это работает в режиме реального времени
Тем временем на MS SQL Server
Построим отчет по данным очереди Statistics
Вот что получилось:
В рассмотренном примере показано как быстро собрать статистику и даже построить отчет, который можно сохранить в PDF и отправить по почте по данным конвейера RabbitMQ и дополнительной очереди. Легко придумать примеры задач, когда информация собирается о каких-либо процессах и строятся отчеты без разработки серверной части. Учитывая, что FastReports предлагает open-source версию, то можно значительно сократить стоимость разработки без дополнительных затрат. Сам конвейер, также легко перенастраивается и может быть приспособлен для более сложных задач.
Задача состоит в том, что бы организовать мониторинг происходящего в очереди и не затронуть основное программное обеспечение (ПО), добавить гибкую возможность построения отчетов и при этом избежать дополнительных затрат. Итоговая конструкция должна позволять быстро строить отчеты для анализа потока данных на конвейере без использования основных мощностей серверов (что позволит избежать дополнительной нагрузки) и основного ПО. Подход должен быть легко переносимым на более сложную архитектуру.
Прежде всего организуем демонстрационный стенд, для этого внесем следующие изменения в работу конвейера:
Изначально для Exchange (faust) была задана следующая конфигурация, которая не меняется в рассматриваем примере при модификации:
Важным является настройка типа fanaut — которая позволяет создать две равноправные очереди и дублировать весь поток сообщений в новую очередь Statistics:
без какого бы то ни-было вмешательства в основной процесс в очереди Logs. Приступим к обработке потока сообщений. Прежде всего создаем таблицу на MS SQL сервере для хранения статистической информации. Вы можете использовать любой другой подход, например, сохранить сообщения в файл в xml формате или любой другой способ, в рассматриваемом примере выбран SQL сервер для того чтобы избежать дополнительного программирования
create table RabbitMsg(
id int PRIMARY KEY IDENTITY(1000,1),
[Message] nvarchar(1000) DEFAULT '',
RegDate datetime default GETDATE())
Как видно из SQL запроса это таблица с номером записи, некоторым текстом и датой вставки записи в таблицу.
Создадим клиента, который будет обращаться к RabbitMQ в очередь Statistics, забирать полученные данные и перекладывать их в таблицу RabbitMsg
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Data.SqlClient;
namespace Getter
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "192.168.1.241", Port = 30672, UserName = "robotics01", Password = "" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "faust", type: "fanout", durable: true);
var queueName = "Statistics";
channel.QueueBind(queue: queueName, exchange: "faust", routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
SqlConnection sqlconnection = new SqlConnection("Server=tcp:fastreportsql,1433;Initial Catalog=FastReportSQL;Persist Security Info=False;User ID=ufocombat;Password=;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;");
sqlconnection.Open();
SqlCommand cmd = new SqlCommand($"INSERT INTO RabbitMsg(Message) VALUES (@msg)", sqlconnection);
cmd.Parameters.AddWithValue("msg", message);
cmd.ExecuteNonQuery();
sqlconnection.Close();
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
Console.WriteLine("Hello World!");
}
}
}
Посмотрим, как это работает в режиме реального времени
Тем временем на MS SQL Server
Построим отчет по данным очереди Statistics
Вот что получилось:
Заключение
В рассмотренном примере показано как быстро собрать статистику и даже построить отчет, который можно сохранить в PDF и отправить по почте по данным конвейера RabbitMQ и дополнительной очереди. Легко придумать примеры задач, когда информация собирается о каких-либо процессах и строятся отчеты без разработки серверной части. Учитывая, что FastReports предлагает open-source версию, то можно значительно сократить стоимость разработки без дополнительных затрат. Сам конвейер, также легко перенастраивается и может быть приспособлен для более сложных задач.
Комментарии (9)
Gigazet
10.04.2019 18:26При определении consumer не нужно декларировать exchange. Достаточно имени exchange при декларации биндинга.
creasol
10.04.2019 19:58+1Кстати на тему мониторинга RabbitMQ я настраивал экспортер для него в prometheus а потом нескучные дашборды в grafana. Если этот стек используется, то мне кажется вполне подойдет.
Вот готовый дашборд:
grafana.com/dashboards/4279
А вот с оф. документации по самому экспортеру (можно его рядом положить или как плагин для rabbitmq поставить):
www.rabbitmq.com/prometheus.html
pallada92
Мне в RabbitMQ всегда казалось странным, что продюсеры и консьюмеры должны декларировать exchange, при этом указывая ее тип. То есть если мы внедряем мониторинг и меняем тип на fanout, то мы должны сделать исправления во всех консьюмерах и продюсерах, когда они декларируют эту exchange. Хотя они не должны знать, как она работает внутри. Это не критика вашей статьи, просто вещь, которая мне кажется странной.
Iv38
Уже созданный эксчендж не будет пересоздан. Таком образом можно поменять тип эксченджа не меняя код. А то что указано в коде будет использоваться по умолчанию.
pallada92
Насколько помню, в питоне
aio-pika
ругается, если при объявлении уже существующей exchange тип не совпадает.WGH
> Хотя они не должны знать, как она работает внутри.
Если они не хотят знать, они же могут её использовать без декларации. Exchange может быть создан кем-то другим.
ufocombat Автор
Ребят спасибо за комментарии, fanout был изначально в очереди хоть она и одна была — такой вот пример))) — статья подразумевает что нужно встроиться в то. что есть и тем более не претендует на то как нужно настраивать очереди