В прошлой статье я рассказал про общую структуру проекта, про работу Kafka с CDC для получения данных из базы. Теперь пришло время поговорить про саму реализацию триггеров на Python. Как говорилось в предыдущей статье, мы будем реализовывать только триггеры Before (Instead Of останутся в базе без изменений). Итак, что же нам необходимо предусмотреть при разработке?

  1. Каждый триггер будет запускаться как отдельный Deployment в K8s, то есть нужно предусмотреть удобный запуск триггеров.

  2. Один триггер может обрабатывать только один топик из Kafka.

  3. В каждом триггере должна быть возможность точечно настраивать фильтры по получаемым из Kafka данным.

Чтение Kafka

Для начала необходимо реализовать чтение топиков Kafka. На этом этапе мы знаем, что один топик — это данные из одной таблицы, и один триггер может обрабатывать данные только из одного топика. Таким образом, мы пришли к реализации ServiceLocator. Только мы будем реализовывать этот паттерн через декораторы.

@SubscribeKafkaTopik('Sales')
class TrSalesUpdate(ABCTrigger):

Таким образом, при запуске нашего сервиса мы будем сразу получать топик, который нужно будет слушать.

class MetaTriggers(type):
    def __getitem__(cls, trigger_name):
        return cls.__triggets__[trigger_name]

class SubscribeKafkaTopik(metaclass=MetaTriggers):
    __triggets__ = {}
    topok_name = None

    def __new__(cls, topik):
        if not hasattr(cls, 'instance'):
            cls.instance = super(SubscribeKafkaTopik, cls).__new__(cls)
            cls.instance.topok_name = topik
        return cls.instance


    def __call__(self, cls):
        if cls.__name__ not in self.__triggets__:
            self.__triggets__[cls.__name__] = cls(self.topok_name)
        return cls


    def __init__(self, topik):
        self.topok_name = topik

    @classmethod
    def print(cls):
        print(cls.__triggets__)

    @classmethod
    def get(cls, trigger_name):
        if trigger_name not in cls.__triggets__: return None
        return cls.__triggets__.get(trigger_name)

Теперь, как же запускать нужный нам топик? Учитывая, что мы реализовали регистрацию каждого триггера и прикрепили сопоставление триггера и топика, нам достаточно реализовать получение названия класса триггера, который мы хотим запустить через аргументы. Мы сделали это через ArgumentParser.

parser.add_argument('--trigger', help='Запускаемый триггер', default=os.getenv('TRIGGER_CLS'))

Пример запуска:

python main.py --trigger TrSalesUpdate

Далее мы получаем название триггера и, по названию триггера, начинаем слушать топик Kafka, который указан в декораторе SubscribeKafkaTopic.

 SubscribeKafkaTopik[args.trigger].listen()

Что здесь происходит?

Во первых, мы получаем из SubscribeKafkaTopic класс, который был зарегистрирован как триггер с использованием декоратора @SubscribeKafkaTopic. Так как при запуске мы указали параметр --trigger TrSalesUpdate, то и на этапе SubscribeKafkaTopic[args.trigger] нам вернется класс TrSalesUpdate. Но откуда метод listen() и что он делает? Тут тоже все достаточно просто. Наш класс TrSalesUpdate, да и все другие классы, которые являются триггерами, унаследованы от базового класса ABCTrigger.

class ABCTrigger(ABC):


    def __init__(self, topik_name = None):
        if topik_name:
            self.consumer = KafkaConsumer(
                topik_name,
                group_id=self.__class__.__name__,
                api_version=(0,10),
                bootstrap_servers=",".join(credentials['kafka']['bootstrap_servers']),
                auto_offset_reset='latest',
                value_deserializer=lambda x: loads(x.decode('utf-8')) if x is not None else None,
            )
            self.consumer.poll(timeout_ms=10000)

    @abstractmethod
    def call(self, message, key = None):
        ...


    def listen(self):
        print(f"Start Listen kafka {self.__class__.__name__}")
        for message in self.consumer:
            if message is None: continue
            Thread(target=self.call, args=(message.value, message.key)).start()

В этом классе есть метод listen(), который начинает слушать топик Kafka. Таким образом, конструкция SubscribeKafkaTopic[args.trigger].listen() запускает получение сообщений из указанного топика.

Во-вторых, после получения сообщений из указанного топика Kafka, сообщение передаётся в метод call.

Thread(target=self.call, args=(message.value, message.key)).start()

Таким образом, в каждом триггере должна быть реализация метода call. Для того, чтобы реализовать логику обработки сообщений для каждого отдельного триггера

def call(self, message, key = None): ...

Теперь нужно рассмотреть, как обрабатывать различные типы событий, чтобы наш триггер срабатывал только при необходимых нам действиях.

Фильтры событий

Как говорилось ранее, нам доступны следующее события:

  • r - read - операция чтения данных из таблицы. Возникает в момент подключения Kafka-Connect к таблице.

  • c - create - операция создания записи, аналог insert

  • u - update - операция обновления записи

  • d - delete - операция удаления записи

Зная эти типы событий, нам необходимо сделать так, чтобы в дальнейшем наш триггер выполнялся при одном или нескольких событиях.

Например, у нас есть триггер обновления цены счета после того, как товар в счете был добавлен, изменен или удален. Соответственно, такой триггер должен срабатывать при следующих событиях: c, u, d.

Или другой пример: нам необходимо в момент добавления товара в счет добавлять себестоимость этого товара в таблицу с товарами в счете. Такой триггер уже должен работать только при событии создания (insert), то есть с типом c.

Реализовать такой фильтр по событиям мы решили через декораторы.

@FilterActionType('u', 'c')
def call(self, message, key = None):
	...

Таким образом, если тип события не совпадает с типом, который был передан в декоратор, то метод call не сработает.

Скрытый текст
class FilterActionType:
    def __init__(self, *actions):
        self.actions = actions

    def __call__(self, fn):
        def call_func(*args, **kwargs):
            if args[1]['payload']['op'] in self.actions: return fn(*args, **kwargs)
            return False
        return call_func

Однако это не всё. Что если триггер должен срабатывать только в том случае, если изменилось какое-то определённое поле либо список определённых полей? И только в том случае, если необходимые нам поля затронуты при изменении, выполнять триггер. Возьмём тот же пример с обновлением цены счета при изменении товара в счёте. Цену счёта нам нужно менять только если изменилось количество товара или изменилась цена товара. Но нам не нужно, чтобы триггер срабатывал, когда, например, изменено описание у этого товара в счёте.

Это мы также сделали через декоратор.

@FilterActionType('u')
@FilterUpdatedRow('Price', 'Quantity')
def call(self, message, key = None): ...

Таким образом, если условие не подойдет хотя бы по одному из декораторов, то метод выполнен не будет.

Скрытый текст
class FilterUpdatedRow:
    def __init__(self, *columns):
        self.columns = columns

    def __call__(self, fn):
        def call_func(*args, **kwargs):
            for column in self.columns:
                if args[1]['payload']['before'][column] != args[1]['payload']['after'][column]:
                    return fn(*args, **kwargs)
            return False
        return call_func

А теперь представим такую ситуацию. У нас есть отгрузочный документ, который закрывается, переводится на последнюю стадию, например, с типом "Закрыли". В таком случае нам необходимо проставить дату, когда был закрыт этот документ. При этом важно учитывать, что дату нужно проставлять только у тех документов, которые были закрыты только на складе A. Таким образом, нам нужно реализовать триггер, который сработает при следующих условиях:

  1. Триггер только на событие обновления

  2. Триггер только на изменение поле Status

  3. Триггер только на измене поля Status на значение равное Закрыто и Склад должен быть равен значению A

Что мы имеем сейчас? У нас есть два фильтра, которые помогут нам реализовать два первых пункта. Но что делать с третьим? Правильно, реализовать!

Что мы видим из ТЗ? Нам нужно реализовать фильтр по значению строк. Это достаточно просто, так как у нас из Kafka приходят значения after и before, и нам достаточно проверить, равно ли значение указанного поля (в нашем случае Status = 'Закрыто' и Store = 'A') значению в блоке after. И тут мы можем пойти двумя путями.

  1. Реализовать декоратор отдельно для каждого поля. То есть в каждый отдельный декоратор мы будем передавать проверку на значение каждого поля. И это будет работать, так как если хоть один из декораторов не сработает, метод call не выполнится. Это подходит под логическое "И". Однако стоит предусмотреть тот вариант, что в ТЗ может стоять "ИЛИ", и тогда нам этот вариант не подойдет.

  2. Реализовать декоратор, который принимает тип фильтрации (or или and) и в зависимости от этого фильтрует данные. И мы решили выбрать именно этот путь. Не понятно? Сейчас посмотрим на реализацию, и всё станет понятнее.

Для начала посмотрим, как эта фильтрация будет выглядеть в триггере:

@FilterActionType('u')
@FilterUpdatedRow('Status')
@FilterRowData(
      and_(
          [
              lambda record: record['after']['Status'] == 'Закрыто',
              lambda record: record['after']['Store'] == 'A',
          ]
      )
 )
 def call(self, message, key = None): ...

В @FilterActionType('u') проверяем, что тип события это обновление

В @FilterUpdatedRow('Status') проверяем, что во время обновления было изменено именно значение поля Status

В @FilterRowData реализуем проверку наших значений. Так как в задаче нужно проверить, что Status = 'Закрыто' и Store = 'A', мы реализовали проверку этих условий через and_, в который передаем лямбда-методы по проверке данных. Таким образом, мы можем реализовать любую логику проверки данных, которые мы получаем из Kafka.

Скрытый текст
class BaseFunc:
    def __init__(self, filters = []):
        self.filters = filters

Скрытый текст
class and_(BaseFunc):


    def __call__(self, data):
        if not self.filters: return True
        result_filters = []
        for idx, filter_ in enumerate(self.filters):
            if (idx >= 2) & (sum(result_filters) != idx): return False
            result_filters.append(int(filter_(data)))

        return sum(result_filters) == len(self.filters)

class or_(BaseFunc):


    def __call__(self, data):
        if not self.filters: return True
        for filter_ in self.filters:
            if filter_(data): return True

        return False

Запуск проверки происходит в декораторе FilterRowData. И если фильтр, который мы передали в конструктор (в данном случае and_), вернёт False, то метод call не отработает.

class FilterRowData:
    def __init__(self, filter_func: BaseFunc = None):
        self.filter_func = filter_func

    def __call__(self, fn):
        def call_func(*args, **kwargs):
            if self.filter_func is None: return fn(*args, **kwargs)
            if self.filter_func(args[1]['payload']): return fn(*args, **kwargs)
            return False
        return call_func

Итоги

Таким образом, мы реализовали основные фильтры для данных, которые получаем из топиков Kafka, и в каждом из запущенных триггеров можно точечно настраивать фильтры под те данные, которые мы ожидаем получить и с которыми будем работать в данном классе. При этом мы реализовали достаточно простую регистрацию триггеров, что позволяет легко запускать нужный обработчик.

Однако остаются и задачи в бэклоге. Например, нужно сделать так, чтобы одна запись обрабатывалась триггером лишь однажды. То есть, если запись с одним и тем же идентификатором обновлялась несколько раз, то триггер для этой записи должен сработать только один раз. Или же реализовать чтение из базы слейва, а запись в базу мастера, чтобы разгрузить рабочую базу, с которой мы будем работать больше всего.

Комментарии (3)


  1. dude_sam
    29.08.2024 06:53

    Наверное, невнимательно читал и оттого совсем не понял, зачем вам в принципе триггеры?
    Если для аудита изменений, то чем не угодили temporal tables?


    1. KainoRhine Автор
      29.08.2024 06:53

      У нас на триггерах запускаются бизнес-процессы, которые должны отрабатывать сразу, как только запись появляется (insert), изменяется (update) или удаляется (delete). Ранее все это работало в MSSQL, и триггеры висели на таблицах. Однако со временем таких триггеров стало очень много, и помимо обновления данных в базе нам нужно было отправлять еще и различные уведомления по почте, в Telegram и WhatsApp. Поэтому со временем база стала работать медленнее, так как мы ждали, пока отработают все триггеры при каждой нашей транзакции insert/update/delete. Поэтому мы и сделали такую систему.


  1. KING_TRITON
    29.08.2024 06:53

    Отличная статья! Очень подробно и понятно расписаны шаги реализации. Вопрос: как вы решаете вопрос с обработкой дубликатов в Kafka?