Введение

В наше время, с развитием бизнеса и увеличением объемов автоматизации и бизнес-процессов, эффективная автоматизация некоторых таких бизнес-процессов становится ключевым фактором успеха для многих компаний. Наша компания не является исключением. По мере роста и развития нашего бизнеса наша CRM-система столкнулась с проблемами медленной работы и увеличения блокировок в базе. Увеличившееся количество бизнес-процессов и интеграций привело к сильной загрузке системы, что начало вызывать блокировки в базе данных и временные остановки процессов внутри компании – ситуация, которую нельзя оставить без изменений.

Мы поняли, что для продолжения успешной работы нам необходимо решение, способное справиться с ежедневной нагрузкой, интегрировать новые сервисы и обрабатывать сложные процессы без задержек. Таким образом, мы столкнулись с вопросами: как избавиться от проблем блокировки, как оптимизировать интеграции и как эффективно обрабатывать сложные запросы, не загружая при этом базу данных.

В результате наших размышлений мы приняли решение разделить нашу CRM-систему на микросервисы. В этой статье я хотел бы поделиться с вами нашим опытом разгрузки CRM-системы и показать, какими путями мы шли для решения выше поставленных вопросов.

Самое начало

Изначально наша CRM-система была загружена не сильно. Было несколько бизнес-процессов, которые в основном отвечали за обновление суммы счета при изменении товара в нем. Все эти процессы были реализованы с использованием триггеров, поскольку наша система, к сожалению, не предоставляла других возможностей. Однако со временем количество таких бизнес-процессов, реализованных на триггерах, стало увеличиваться.

Кроме того, появились и другие системы, которые должны были обмениваться данными с нашей CRM-системой, такие как 1С, сайт, складская система 1С и так далее. В результате всех этих интеграций и процессов мы столкнулись с увеличением количества блокировок, которые существенно затрудняли работу сотрудников внутри нашей CRM-системы.

Разделение на Мастер - Слейв

Первое, что пришло нам в голову, это разделить базу на Master-Slave. Мы включили репликацию транзакциями и стали получать данные на слейве в реальном времени, что позволило нам вынести все интеграции и скрипты, которые активно читают базу, на слейв, чтобы не загружать мастера, на котором активно работают пользователи. Это помогло на первое время, пока количество бизнес-процессов (которые были реализованы на триггерах) не стало еще больше. И тут мы начали задумываться, как можно избавиться от этих триггеров и сделать так, чтобы они отрабатывали асинхронно, не блокируя транзакцию пользователя.

Немного о триггерах

Когда пользователь вставляет запись в таблицу, запускаются триггеры на INSERT (AFTER/INSTEAD OF). И пока все триггеры, которые запускаются при событии добавления записи, не отработают, транзакция на вставку новой записи не завершится. Таким образом, мы можем столкнуться (и столкнулись) с блокировками.

Мы решили перенести все триггеры на Python-скрипты. Однако нам нужно было придумать, как мы будем получать новые транзакции и тип этой транзакции (INSERT/UPDATE/DELETE). И тут мы пришли к CDC и Kafka-Connect.

Подключение CDC и Kafka-Connect к MSSQL

Тут нам на помощь пришли CDC и Kafka Connect. На слейве мы включили CDC. Почему именно на слейве? Здесь все просто: при включении CDC MSSQL создает системные таблицы, в которых хранит все транзакции, которые были сделаны. Поскольку у нас за сутки может быть более 2 миллионов транзакций над одной таблицей, такой журнал будет занимать много места, что может негативно сказаться на производительности сервера с работающей базой данных. Поэтому решено было включить CDC на слейве, чтобы не затрагивать (не нагружать) мастера. Кроме того, у нас настроена репликация транзакциями, поэтому данные на слейв поступают в реальном времени.

CDC (Change Data Capture) - это механизм, используемый в базах данных SQL для отслеживания и захвата изменений данных в базе данных. CDC позволяет автоматизировано отслеживать и записывать изменения, производимые над данными в реальном времени. Этот механизм обычно используется для репликации данных, управления журналами изменений, аудита и других целей, где важно знать, какие изменения были внесены в базу данных и когда.

CDC обычно работает путем захвата изменений, происходящих с данными, и записи этих изменений в специальные таблицы - таблицы журнала изменений (change log tables) или журнальные файлы.

Kafka Connect - это платформа для интеграции данных, которая позволяет легко и надежно подключать различные источники и приемники данных к Apache Kafka. Она предоставляет готовые инструменты для создания и запуска коннекторов, которые позволяют передавать данные между Kafka-топиками и внешними системами.

Концепция Kafka Connect основана на идее использования легковесных коннекторов для обеспечения интеграции с различными системами без необходимости писать собственный код. Коннекторы могут быть разработаны и поддерживаться сообществом или сторонними поставщиками, что позволяет упростить процесс интеграции и повысить масштабируемость.

Kafka Connect состоит из двух основных компонентов:

Connectors - компоненты, которые определяют и управляют потоком данных между источниками данных и Kafka, а также между Kafka и приемниками данных.

Workers - процессы, запускаемые на узлах кластера Kafka Connect, которые выполняют коннекторы и обрабатывают передачу данных.

Kafka Connect позволяет значительно упростить процесс интеграции данных, улучшить надежность и масштабируемость передачи данных между различными системами с помощью Apache Kafka.

Включение CDC

Hidden text
USE [db_name];

GO

-- Включение CDC для БД

EXEC sys.sp_cdc_enable_db
GO

--Включение CDC для Таблицы posts
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'posts',
@role_name = NULL,
@filegroup_name = N'my_file_group' --Можно создать в настройках базы

GO

После включения CDC, мы развернули кластер Kafka для получения данных из таблиц. Мы решили, что данные в топиках будут храниться не более часа, а вся история будет грузиться в Minio на случай если придется работать только с затронутыми за период данными (а такие БП есть). 

После чего мы подключили Kafka-Connect к базе и начали получать данные в топиках Kafka

Пример Kafka-Connect к MSSQL

Hidden text
#!/bin/bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '
{
"name": "<тут название>",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "<Тут IP MSSQL>",
"database.port": "<Тут Порт>",
"database.user": "<Тут Пользователь БД>",
"database.password": "<Тут пароль от пользователя БД>",
"database.names": "<Тут Название БД>",
"database.encrypt": "false",
"topic.prefix": "<тут название префикса>", 
"table.include.list": "<тут таблица>",
"schema.history.internal.kafka.bootstrap.servers": "<Тут адреса Кластера Kafka указанные через ,>", 
"schema.history.internal.kafka.topic": "####"
 }
}

Пример Kafka-Connect к S3

Hidden text
#!/bin/bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "<тут название>",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"store.url":"<Тут адрес Minio/S3>",
"tasks.max":"1",
"topics":"<Тут Название топика из которого нужно забирать данные>",
"s3.bucket.name":"<Тут Название Бакета в Minio/S3>",
"s3.region":"us-east-1",
"s3.part.size":"5242880", 
"flush.size":"100000", 
"locale":"en",
"path.format":"'date'=YYYY-MM-dd/'hour'=HH",
"partition.duration.ms":"600000",
"rotate.schedule.interval.ms":"60000",
"timezone":"UTC",
"timestamp.extractor":"Record",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"enable.ssl.certificate.verification":"False",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"behavior.on.null.values":"ignore"
 }
}

Пример данных в Kafka

Hidden text
{
   "schema": {
      "type": "struct",
      "fields": [],
      "optional": false,
      "name": "####",
      "version": 1
   },
   "payload": {
      "before": {},
      "after": {},
      "source": {
         "version": "2.1.2.Final",
         "connector": "sqlserver",
         "name": "####",
         "ts_ms": 1703674805647,
         "snapshot": "false",
         "db": "####",
         "sequence": null,
         "schema": "####",
         "table": "####",
         "change_lsn": "00002073:00081cb5:0006",
         "commit_lsn": "00002073:00081da4:0020",
         "event_serial_no": 2
      },
      "op": "u",
      "ts_ms": 1703674805975,
      "transaction": null
   }
}

Таким образом мы получаем все новые транзакции в топиках Kafka. И в реальном времени можем их обрабатывать  + у нас есть тип транзакций: 

- r - Read (Появляется когда Kafka считывает все данные из таблицы)

- c - Create Появляется при вставке новой записи в таблицу 

- u - Update Проявляется при изменении записи в таблице 

- d - Delete появляется при удалении записи из таблицы 

Также в топиках есть данные о записи которая была до изменения, что позволяет нам сравнивать данные до и после изменения.

Получение данных из топиков Kafka в Pyhon

О том как работает Kakfa есть хорошая статья: тут https://practicum.yandex.ru/blog/broker-soobsheniy-apache-kafka/ и тут https://habr.com/ru/companies/slurm/articles/550934/

Как же нам получать данные в наших скриптах? 

Мы использовали библиотеку kafka и с ее помощью подключались к кластеру Kafka

KafkaConsumer(
            self.__context.topik,
            bootstrap_servers=",".join(credentials['kafka'['bootstrap_servers']),
            auto_offset_reset='latest',
            group_id=f"{self.__context.table}.{self.__context.trigger}",
            value_deserializer=lambda x: loads(x.decode('utf-8')) if x is not None else None,
            api_version=(0,10)
        )

Здесь все достаточно просто. Первым параметром мы указываем топик который будем слушать, далее указываем список адресов Kafka в кластере через запятую. Далее в параметре auto_offset_reset указываем что хотим получать только новые записи. И наконец параметр group_id в котором мы указываем ID группы слушателей. Так как один топик могут слушать несколько скриптов Consumer'ов, то нам нужно указать group_id, чтобы в контексте одной группы, каждый скрипт получал уникальное сообщение.  Таким образом мы можем разгрузить нагруженные топики увеличив кол-во патриций и кол-во Consumer'ов.

Теперь как же обрабатывать данные? Во-первых в данных, которые мы получаем есть "op", который содержит тип транзакции, о которых я писал выше. Следовательно, мы можем реализовывать обработчики на различные события (INSERT/UPDATE/DELETE), прям как в триггерах MSSQL (это как раз то к чему мы и шли). Теперь, зная о том что мы можем получать тип транзакции мы реализуем декоратор, который будет принимать тип ожидаемой транзакции и если она совпадает то выполнять нужный нам скрипт в противном случае ничего не делать.

Пример реализации декоратора

class OnAction:
    def __init__(self, actions=[]):
        self.actions = actions

    def __call__(self, fn):
        def call_func(*args, **kwargs):
            if args[1]['payload']['op'] in self.actions: return fn(*args, **kwargs)
        return call_func

Пример использования Декторатора 

class AddPurchaseToShedex(Module):

    def __init__(self, context = None, param = {}):
        super().__init__(context, param)

    @OnAction(['u', 'c'])
    def __call__(self, message):
        ...

Во-вторых надо разобраться с данными которые мы хотим получать. В JSON который отправляется в Kafka есть объект before  с данными которые были в таблицы до обновления либо после удаления и объект after с данными которые поступили при создании записи либо при ее изменении. Здесь все еще проще если к нам приезжает событие INSERT/UPDATE то новые данные будут лежать в after. Если к нам приезжает событие DELETE то данные которые были удалены будут лежать в before

Появление K8s

Хорошо, скрипты, которые реализуют логику Триггеров из БД у нас есть. Но один слушатель не справляется с нагрузкой которая поступает в топик Kafka. Да и к тому же, нужно администрировать большое количество таких триггеров и иметь возможность их скейлить. Что делать? Мы увеличили кол-во партиции и нам нужно увеличить кол-во слушателей в каждой группе. Тут 2 варианта: 

1. Мы можем запускать несколько процессов силами Python

2. Мы можем использовать K8s 

Первый вариант нас не устраивал. Во-первых, потому  что скрипт может вывалиться и его придется поднимать руками (не всегда, можно обойтись конечно и DockerCompose), во-вторых у нас таких скриптов становится много, и их нужно админить, обновлять и масштабировать. Если мы будем использовать например DockerCompose то это это сработает в рамках одного сервера. 

 deploy:
      replicas:3 #количество копий скрипта, которые вы хотите запустить

Но нам нужно иметь возможность масштабировать скрипты по разным серверам. И тут нам на помощь приходит K8s, который позволил нам : 

1. Автоматизировать деплой новых версий с использованием Gitlab-CI + ArgoCD. 

2. Упростить увеличение кол-ва слушателей через параметр REPLICASET Нам достаточно в values.yaml указать количество реалик скрипта и на каких серверах он будет отрабатывать (на всех или только на каком то определенном). 

3. Автоматизировать запуск скриптов при падении. Если какой-то из скриптов выпадет с ошибкой K8s самостоятельно его поднимет 

4. Облегчить введение новых нод в кластер K8s что позволяет нам практически неограниченно расширять наши мощности под новые и новые сервисы

Краткие итоги

Таким образом мы добились следующего

  1. За месяц работы, мы развернули новую инфраструктуру на основе K8s, настроили Kafka, реплику

  2. Перенесли все основные триггеры на MSA, что значительно разгрузило нашу CRM систему и мы избавились от блокировок и увеличили скорость ее работы.

  3. Создали инструмент для разработки новых БП, которые изначально были нам не доступны.

Комментарии (1)


  1. ptr128
    06.06.2024 16:49
    +1

    CDC уж очень громоздок и требует обслуживания. Поигрались и выкинули. Сделали свой велосипед, в виде таблички с идентификатором таблицы, идентификатором записи, типом операции и rowversion. Соответственно триггеры на все исходные таблицы топиков, которые в эту табличку и пишут. Ну и сервис только одной этой таблички, который уже публикует в топики.

    Про то, как добились консистентности данных можно отдельную статью писать. Кафка не транзакционная и это большая боль. Приходится отрабатывать ситуации, когда ссылка на элемент справочника есть, а сам элемент справочника до подписчика ещё не дошел или его содержимое неактуально.