За что я люблю технологии 1С?

Это за широкую возможность интеграции с другими системами. А сколько возможностей открывают расширения, написанные на внешней библиотеке DLL. Предлагаю рассмотреть одну из таких возможностей - интеграции через брокер-сообщения.

Интеграция 1С в экосистеме

Я столкнулся с ситуацией, когда экосистема проектов написаны на разных платформах. У нас есть мобильное приложение, web-портал и несколько РИБ 1С баз и нам нужно настроить мгновенный обмен данными номенклатур между этими базами, желательно с минимальной задержкой. Я не буду говорить о структуре сообщения, которое нам нужно передать, это может быть XML, и JSON, что угодно. Давайте поговорим о способе передачи этих данных.

Требования:

  • большие объемы данных

  • оперативная выгрузка с минимальной задержкой

  • возможность принимать данные в нескольких потоках

Сейчас в 1С популярны способы интеграции:

  • HTTP и web-сервис на стороне 1С

  • подключение HTTP get/post запрос

  • синхронизация данных XML

  • ODATA соединение к базе 1С

  • внешние источники

Каждый из этих способов хорошо подходит для решения определенных задач, но не в нашей ситуации. Коротко опишу почему.

HTTP и web-сервис имеют ограничения на одновременное подключение, по умолчанию установлено ограничение на 10 параллельных запросов, можем увеличить количество по необходимости. Представьте, стороннее приложение начинает отправлять в цикле get/post запрос, этот лимит мгновенно заполнится, и начнут вылетать ошибки time-out. Эта война между приложениями никогда не закончится.

Еще одной причиной неэффективности таких способов является то, что мы не сможем выполнять оперативную выгрузку данных с минимальной задержкой. Эти способы не смогут обеспечить мгновенное получение данных из систем.

Решение этих задач я вижу в брокерах сообщений, таких как RabbiMQ, Kafka, NUTS, ActiveMQ. Мое внимание привлекла именно Kafka - непрерывная передача информации со smart-периферии (конечных устройств) в IoT-платформу.

Когда данные не только передаются, но и обрабатываются множеством клиентов, которые называются подписчиками (consumers).

В роли подписчиков выступают приложения и программные сервисы. Здесь имеют место отложенные вычисления, когда подписчиков меньше, чем сообщений от издателей - источников данных (producer). Сообщения (messages) записываются по разделам (partition), темам (topic) и хранятся в течение заданного периода. Подписчики сами опрашивают Kafka на предмет наличия новых сообщений и указывают, какие записи им нужно прочесть, увеличивая или уменьшая смещение к нужной записи.

Давайте перейдем к практике создания простого приложения на 1С для обмена данными. План создания архитектуры интеграции посредством Kafka будет следующий.

  • сначала напишем библиотеку DLL для установки соединения

  • чтобы стать подписчиком (consumers) и слушать топик на наличие новых данных, мы создадим «бессмертное» фоновое задание

  • для отправки данных в топик (producer) мы сделаем подписку в 1С на запись объектов, для мгновенной отправки

Поднять и настроить сервер Kafka не сложно, в интернете достаточно необходимой информации.

Напишем библиотеку DLL. Я выбрал язык программирования C# (опыта в этом языке у меня немного), вот мой пример:

using System;

using System.Collections.Generic;

 using System.Text;

 using System.Threading;

 using System.Net.NetworkInformation;

 using System.Security.Cryptography;

 using System.Runtime.InteropServices;

 using Confluent.Kafka;

 using System.Linq;

namespace Kafka1CConnect

 {

     [ComVisible(true)]

     [InterfaceType(ComInterfaceType.InterfaceIsIDispatch)]

     [Guid("764A91C9-7472-4DC6-810B-4EB31BE26DB9")]

     public interface Kafka1CEvent

     {

         [DispId(0x60020000)]

         string GetInfo();

     }

  

     [ComVisible(true)]

     [ClassInterface(ClassInterfaceType.AutoDual)]

     [Guid("A6498157-CCC0-47B9-8B0F-40BBD3AFF096")]

     [ComSourceInterfaces(typeof(Kafka1CEvent))]

     [ProgId("Kafka1C.ConnectKafka")]

     public class ConnectKafka : IDisposable

     {

  

         public ConnectKafka()

         {

             // do nothing

         }

  

         ~ConnectKafka()

         {

             Dispose();

         }

  

         private IProducer<string, string> Producer;

         private IConsumer<string, string> consumer;

         private CancellationTokenSource cts;

  

         public void Dispose()

         {

  

         }

         public void ProducerCreate(string ConnectString, int ComprType=1, int LingerMs=0, int mmb=1000000)

         {

             // do nothing

             var config = new ProducerConfig

             {

                 BootstrapServers = ConnectString,

                 Acks = Acks.All,

                 LingerMs = LingerMs,

                 MessageMaxBytes = mmb,

                 CompressionType = (CompressionType) ComprType

             };

  

  

             Producer = new ProducerBuilder<string, string>(config).Build();

         }

  

         public string SendProducer(string topic, string table, string value)

         {

             var msg = new Message<string, string> { Key = table, Value = value };

             //byte[] bytes = Encoding.UTF8.GetBytes("123123231");

             //msg.Headers.Add("head", bytes);

             var res = Producer.ProduceAsync(topic, msg).GetAwaiter().GetResult();

             return res.Status.ToString();

         }

  

         public void ProducerClose()

         {

             Producer.Flush(TimeSpan.FromSeconds(5));

             Producer.Dispose();

         }

  

         public void ConsumerCreate(string ConnectString, string topic, string GroupId, bool eof=false)

         {

             var config = new ConsumerConfig

             {

                 GroupId = GroupId,

                 BootstrapServers = ConnectString,

                 AutoOffsetReset = AutoOffsetReset.Earliest,

                 EnablePartitionEof = eof,

                 Debug = "cgrp"

             };

  

             consumer = new ConsumerBuilder<string, string>(config).Build();

             consumer.Subscribe(new string[] { topic });

         }

  

         public string[] Consume()

         {

             var AnswerArr = new List<string>();

             cts = new CancellationTokenSource();

             var tokenCancel = cts.Token;

             try

             {

                 var cr = consumer.Consume(tokenCancel);

                 if (cr == null)

                 {

                     return null;

                 }

                 else

                 {

                     if (!cr.IsPartitionEOF)

                     {

                         AnswerArr.Add(cr.Topic + "");

                         AnswerArr.Add(cr.Key + "");

                         AnswerArr.Add(cr.Value + "");

AnswerArr.Add(cr.Timestamp.UtcDateTime.ToString());

                     }

                     else

                     {

                         AnswerArr.Add("EOF");

                     }

                 }

             }

             catch (ConsumeException e)

             {

                 AnswerArr.Add(e.Error.Reason);

             }

  

             return AnswerArr.ToArray();

         }

  

         public void ConsumeClose()

        {

            cts.Cancel();

            consumer.Close();

        }

}}

В этой библиотеке основные методы это:

  • для чтения данных: ConsumerCreate(), Consume(), ConsumeClose()

  • для отправки данных: ProducerCreate(), SendProducer(), ProducerClose()

Теперь напишем фоновое задание с «бессмертным» циклом для непрерывного чтения данных:

kafka = Новый COMОбъект("Kafka1C.ConnectKafka"); kafka.ConsumerCreate("localhost","9092","test.topic","base01")

Пока Истина Цикл

ArrAns = kafka.Consume();

    Если ArrAns = NULL или ArrAns = Неопределено Тогда

        Сообщить("Обработано " + Строка(к) + " записей!" );

        Прервать;

    КонецЕсли;

    массив = ArrAns.Выгрузить();

    Если массив.Количество()=4 Тогда

        Топик =  массив[0]; // Топик (тема), с которого было прочитано сообщение

        Ключ =  массив[0]; // Ключ сообщения

        Значение =  массив[0]; // Сообщение

        Время =  массив[0]; // Время отправки

    ИначеЕсли массив.Количество()=1 Тогда

        Сообщить(массив[0];);

    КонецЕсли;

КонецЦикла;

kafka = NULL;

Думаю, в этом коде ничего сложного, мы создаем экземпляр библиотеки DLL в коде 1С и пользуемся его методами.

Обратите внимание, хоть цикл и непрерывный, при вызове метода kafka.Consume() цикл становится на паузу, в ожидании нового сообщения, это позволяет уменьшить лишнюю нагрузку на процессоры 1С.

Так мы обеспечили мгновенную отправку данных с базы А при записи объекта и мгновенное чтение в базе Б.

Я показал самый простой пример применения брокер сообщений для интеграции внутри баз 1С и с другими базами. Это привело к развитию микросервисных систем с моментально быстрым с обменом данным. Все задачи решались на много быстрее. Также решили проблему логирования сообщений и хранение истории обмена для отказоустойчивой системы интеграции.

Целью данной статьи было не показать best-practices или не убеждать вас, уважаемые читатели, что для шины данных может использоваться только Apache Kafka, а предать свой опыт, для раздумья и развития вашего собственной экосистемы. Вы можете использовать любой брокер сообщения, для шины данных, главное правильно построить архитектуру и без "фанатизма".

В итоге мы применение нашей шины в таких проектах, как: MDM (быстрое согласование и контроль данных), DWH (сбор структурированных данных), CRM (быстрое согласование заявок).

Комментарии (8)


  1. acwartz
    18.08.2021 14:52

    Я извиняюсь, как вам удается писать на 1с?

    Ладно ещё литералы строковые, иногда туда каюсь русское что-то пишу.

    Но блин в коде имена функций с включениями русского и английского - непостижимо.


    1. tumyp_k Автор
      18.08.2021 14:54

      Хехе, ну приходится. Каюсь, иногда чешутся руки писать на латинице)


    1. vis_inet
      18.08.2021 14:55

      Писать код можно полностью на латинице, если это удобнее.


  1. anonymous
    00.00.0000 00:00


  1. MacIn
    19.08.2021 02:41

    Топик = массив[0]; // Топик (тема), с которого было прочитано сообщение
    Ключ = массив[0]; // Ключ сообщения
    Значение = массив[0]; // Сообщение
    Время = массив[0]; // Время отправки

    А нет ли здесь copy-paste ошибки?


  1. VaalKIA
    19.08.2021 11:39

    HTTP и web-сервис имеют ограничения на одновременное подключение, по умолчанию установлено ограничение на 10 параллельных запросов, можем увеличить количество по необходимости.

    Дайте пруф на эту информацию, и что не так с увеличением количества?


    1. Fragster
      19.08.2021 12:07
      +1

      При включении повторного использования сеансов будет ошибка (HTTP status) 406, если одновременно подключений больше, чем указано в пуле. Ничего страшного в этом нет. Повторное использование сеансов экономит некоторое количество времени на инициализацию параметров сеанса (если в параметрах сеанса много кода, например для заполнения значений RLS, это могут быть единицы секунд для баз уровня ERP), и, если база не активная, - на заполнении кэша метаданных на сервере 1с, который выгружает кэш при закрытии последнего соединения, а это уже могут быть даже десятки секунд, если диски медленные, а база ERP или КА.


  1. anonymous
    00.00.0000 00:00