За что я люблю технологии 1С?
Это за широкую возможность интеграции с другими системами. А сколько возможностей открывают расширения, написанные на внешней библиотеке DLL. Предлагаю рассмотреть одну из таких возможностей - интеграции через брокер-сообщения.
Интеграция 1С в экосистеме
Я столкнулся с ситуацией, когда экосистема проектов написаны на разных платформах. У нас есть мобильное приложение, web-портал и несколько РИБ 1С баз и нам нужно настроить мгновенный обмен данными номенклатур между этими базами, желательно с минимальной задержкой. Я не буду говорить о структуре сообщения, которое нам нужно передать, это может быть XML, и JSON, что угодно. Давайте поговорим о способе передачи этих данных.
Требования:
большие объемы данных
оперативная выгрузка с минимальной задержкой
возможность принимать данные в нескольких потоках
Сейчас в 1С популярны способы интеграции:
HTTP и web-сервис на стороне 1С
подключение HTTP get/post запрос
синхронизация данных XML
ODATA соединение к базе 1С
внешние источники
Каждый из этих способов хорошо подходит для решения определенных задач, но не в нашей ситуации. Коротко опишу почему.
HTTP и web-сервис имеют ограничения на одновременное подключение, по умолчанию установлено ограничение на 10 параллельных запросов, можем увеличить количество по необходимости. Представьте, стороннее приложение начинает отправлять в цикле get/post запрос, этот лимит мгновенно заполнится, и начнут вылетать ошибки time-out. Эта война между приложениями никогда не закончится.
Еще одной причиной неэффективности таких способов является то, что мы не сможем выполнять оперативную выгрузку данных с минимальной задержкой. Эти способы не смогут обеспечить мгновенное получение данных из систем.
Решение этих задач я вижу в брокерах сообщений, таких как RabbiMQ, Kafka, NUTS, ActiveMQ. Мое внимание привлекла именно Kafka - непрерывная передача информации со smart-периферии (конечных устройств) в IoT-платформу.
Когда данные не только передаются, но и обрабатываются множеством клиентов, которые называются подписчиками (consumers).
В роли подписчиков выступают приложения и программные сервисы. Здесь имеют место отложенные вычисления, когда подписчиков меньше, чем сообщений от издателей - источников данных (producer). Сообщения (messages) записываются по разделам (partition), темам (topic) и хранятся в течение заданного периода. Подписчики сами опрашивают Kafka на предмет наличия новых сообщений и указывают, какие записи им нужно прочесть, увеличивая или уменьшая смещение к нужной записи.
Давайте перейдем к практике создания простого приложения на 1С для обмена данными. План создания архитектуры интеграции посредством Kafka будет следующий.
сначала напишем библиотеку DLL для установки соединения
чтобы стать подписчиком (consumers) и слушать топик на наличие новых данных, мы создадим «бессмертное» фоновое задание
для отправки данных в топик (producer) мы сделаем подписку в 1С на запись объектов, для мгновенной отправки
Поднять и настроить сервер Kafka не сложно, в интернете достаточно необходимой информации.
Напишем библиотеку DLL. Я выбрал язык программирования C# (опыта в этом языке у меня немного), вот мой пример:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Net.NetworkInformation;
using System.Security.Cryptography;
using System.Runtime.InteropServices;
using Confluent.Kafka;
using System.Linq;
namespace Kafka1CConnect
{
[ComVisible(true)]
[InterfaceType(ComInterfaceType.InterfaceIsIDispatch)]
[Guid("764A91C9-7472-4DC6-810B-4EB31BE26DB9")]
public interface Kafka1CEvent
{
[DispId(0x60020000)]
string GetInfo();
}
[ComVisible(true)]
[ClassInterface(ClassInterfaceType.AutoDual)]
[Guid("A6498157-CCC0-47B9-8B0F-40BBD3AFF096")]
[ComSourceInterfaces(typeof(Kafka1CEvent))]
[ProgId("Kafka1C.ConnectKafka")]
public class ConnectKafka : IDisposable
{
public ConnectKafka()
{
// do nothing
}
~ConnectKafka()
{
Dispose();
}
private IProducer<string, string> Producer;
private IConsumer<string, string> consumer;
private CancellationTokenSource cts;
public void Dispose()
{
}
public void ProducerCreate(string ConnectString, int ComprType=1, int LingerMs=0, int mmb=1000000)
{
// do nothing
var config = new ProducerConfig
{
BootstrapServers = ConnectString,
Acks = Acks.All,
LingerMs = LingerMs,
MessageMaxBytes = mmb,
CompressionType = (CompressionType) ComprType
};
Producer = new ProducerBuilder<string, string>(config).Build();
}
public string SendProducer(string topic, string table, string value)
{
var msg = new Message<string, string> { Key = table, Value = value };
//byte[] bytes = Encoding.UTF8.GetBytes("123123231");
//msg.Headers.Add("head", bytes);
var res = Producer.ProduceAsync(topic, msg).GetAwaiter().GetResult();
return res.Status.ToString();
}
public void ProducerClose()
{
Producer.Flush(TimeSpan.FromSeconds(5));
Producer.Dispose();
}
public void ConsumerCreate(string ConnectString, string topic, string GroupId, bool eof=false)
{
var config = new ConsumerConfig
{
GroupId = GroupId,
BootstrapServers = ConnectString,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = eof,
Debug = "cgrp"
};
consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe(new string[] { topic });
}
public string[] Consume()
{
var AnswerArr = new List<string>();
cts = new CancellationTokenSource();
var tokenCancel = cts.Token;
try
{
var cr = consumer.Consume(tokenCancel);
if (cr == null)
{
return null;
}
else
{
if (!cr.IsPartitionEOF)
{
AnswerArr.Add(cr.Topic + "");
AnswerArr.Add(cr.Key + "");
AnswerArr.Add(cr.Value + "");
AnswerArr.Add(cr.Timestamp.UtcDateTime.ToString());
}
else
{
AnswerArr.Add("EOF");
}
}
}
catch (ConsumeException e)
{
AnswerArr.Add(e.Error.Reason);
}
return AnswerArr.ToArray();
}
public void ConsumeClose()
{
cts.Cancel();
consumer.Close();
}
}}
В этой библиотеке основные методы это:
для чтения данных: ConsumerCreate(), Consume(), ConsumeClose()
для отправки данных: ProducerCreate(), SendProducer(), ProducerClose()
Теперь напишем фоновое задание с «бессмертным» циклом для непрерывного чтения данных:
kafka = Новый COMОбъект("Kafka1C.ConnectKafka"); kafka.ConsumerCreate("localhost","9092","test.topic","base01")
Пока Истина Цикл
ArrAns = kafka.Consume();
Если ArrAns = NULL или ArrAns = Неопределено Тогда
Сообщить("Обработано " + Строка(к) + " записей!" );
Прервать;
КонецЕсли;
массив = ArrAns.Выгрузить();
Если массив.Количество()=4 Тогда
Топик = массив[0]; // Топик (тема), с которого было прочитано сообщение
Ключ = массив[0]; // Ключ сообщения
Значение = массив[0]; // Сообщение
Время = массив[0]; // Время отправки
ИначеЕсли массив.Количество()=1 Тогда
Сообщить(массив[0];);
КонецЕсли;
КонецЦикла;
kafka = NULL;
Думаю, в этом коде ничего сложного, мы создаем экземпляр библиотеки DLL в коде 1С и пользуемся его методами.
Обратите внимание, хоть цикл и непрерывный, при вызове метода kafka.Consume() цикл становится на паузу, в ожидании нового сообщения, это позволяет уменьшить лишнюю нагрузку на процессоры 1С.
Так мы обеспечили мгновенную отправку данных с базы А при записи объекта и мгновенное чтение в базе Б.
Я показал самый простой пример применения брокер сообщений для интеграции внутри баз 1С и с другими базами. Это привело к развитию микросервисных систем с моментально быстрым с обменом данным. Все задачи решались на много быстрее. Также решили проблему логирования сообщений и хранение истории обмена для отказоустойчивой системы интеграции.
Целью данной статьи было не показать best-practices или не убеждать вас, уважаемые читатели, что для шины данных может использоваться только Apache Kafka, а предать свой опыт, для раздумья и развития вашего собственной экосистемы. Вы можете использовать любой брокер сообщения, для шины данных, главное правильно построить архитектуру и без "фанатизма".
В итоге мы применение нашей шины в таких проектах, как: MDM (быстрое согласование и контроль данных), DWH (сбор структурированных данных), CRM (быстрое согласование заявок).
Комментарии (8)
MacIn
19.08.2021 02:41Топик = массив[0]; // Топик (тема), с которого было прочитано сообщение
Ключ = массив[0]; // Ключ сообщения
Значение = массив[0]; // Сообщение
Время = массив[0]; // Время отправки
А нет ли здесь copy-paste ошибки?
VaalKIA
19.08.2021 11:39HTTP и web-сервис имеют ограничения на одновременное подключение, по умолчанию установлено ограничение на 10 параллельных запросов, можем увеличить количество по необходимости.
Дайте пруф на эту информацию, и что не так с увеличением количества?Fragster
19.08.2021 12:07+1При включении повторного использования сеансов будет ошибка (HTTP status) 406, если одновременно подключений больше, чем указано в пуле. Ничего страшного в этом нет. Повторное использование сеансов экономит некоторое количество времени на инициализацию параметров сеанса (если в параметрах сеанса много кода, например для заполнения значений RLS, это могут быть единицы секунд для баз уровня ERP), и, если база не активная, - на заполнении кэша метаданных на сервере 1с, который выгружает кэш при закрытии последнего соединения, а это уже могут быть даже десятки секунд, если диски медленные, а база ERP или КА.
acwartz
Я извиняюсь, как вам удается писать на 1с?
Ладно ещё литералы строковые, иногда туда каюсь русское что-то пишу.
Но блин в коде имена функций с включениями русского и английского - непостижимо.
tumyp_k Автор
Хехе, ну приходится. Каюсь, иногда чешутся руки писать на латинице)
vis_inet
Писать код можно полностью на латинице, если это удобнее.