В прошлой статье я рассказал про общую структуру проекта, про работу Kafka с CDC для получения данных из базы. Теперь пришло время поговорить про саму реализацию триггеров на Python. Как говорилось в предыдущей статье, мы будем реализовывать только триггеры Before (Instead Of останутся в базе без изменений). Итак, что же нам необходимо предусмотреть при разработке?
Каждый триггер будет запускаться как отдельный Deployment в K8s, то есть нужно предусмотреть удобный запуск триггеров.
Один триггер может обрабатывать только один топик из Kafka.
В каждом триггере должна быть возможность точечно настраивать фильтры по получаемым из Kafka данным.
Чтение Kafka
Для начала необходимо реализовать чтение топиков Kafka. На этом этапе мы знаем, что один топик — это данные из одной таблицы, и один триггер может обрабатывать данные только из одного топика. Таким образом, мы пришли к реализации ServiceLocator. Только мы будем реализовывать этот паттерн через декораторы.
@SubscribeKafkaTopik('Sales')
class TrSalesUpdate(ABCTrigger):
Таким образом, при запуске нашего сервиса мы будем сразу получать топик, который нужно будет слушать.
class MetaTriggers(type):
def __getitem__(cls, trigger_name):
return cls.__triggets__[trigger_name]
class SubscribeKafkaTopik(metaclass=MetaTriggers):
__triggets__ = {}
topok_name = None
def __new__(cls, topik):
if not hasattr(cls, 'instance'):
cls.instance = super(SubscribeKafkaTopik, cls).__new__(cls)
cls.instance.topok_name = topik
return cls.instance
def __call__(self, cls):
if cls.__name__ not in self.__triggets__:
self.__triggets__[cls.__name__] = cls(self.topok_name)
return cls
def __init__(self, topik):
self.topok_name = topik
@classmethod
def print(cls):
print(cls.__triggets__)
@classmethod
def get(cls, trigger_name):
if trigger_name not in cls.__triggets__: return None
return cls.__triggets__.get(trigger_name)
Теперь, как же запускать нужный нам топик? Учитывая, что мы реализовали регистрацию каждого триггера и прикрепили сопоставление триггера и топика, нам достаточно реализовать получение названия класса триггера, который мы хотим запустить через аргументы. Мы сделали это через ArgumentParser.
parser.add_argument('--trigger', help='Запускаемый триггер', default=os.getenv('TRIGGER_CLS'))
Пример запуска:
python main.py --trigger TrSalesUpdate
Далее мы получаем название триггера и, по названию триггера, начинаем слушать топик Kafka, который указан в декораторе SubscribeKafkaTopic.
SubscribeKafkaTopik[args.trigger].listen()
Что здесь происходит?
Во первых, мы получаем из SubscribeKafkaTopic класс, который был зарегистрирован как триггер с использованием декоратора @SubscribeKafkaTopic. Так как при запуске мы указали параметр --trigger TrSalesUpdate, то и на этапе SubscribeKafkaTopic[args.trigger] нам вернется класс TrSalesUpdate. Но откуда метод listen() и что он делает? Тут тоже все достаточно просто. Наш класс TrSalesUpdate, да и все другие классы, которые являются триггерами, унаследованы от базового класса ABCTrigger.
class ABCTrigger(ABC):
def __init__(self, topik_name = None):
if topik_name:
self.consumer = KafkaConsumer(
topik_name,
group_id=self.__class__.__name__,
api_version=(0,10),
bootstrap_servers=",".join(credentials['kafka']['bootstrap_servers']),
auto_offset_reset='latest',
value_deserializer=lambda x: loads(x.decode('utf-8')) if x is not None else None,
)
self.consumer.poll(timeout_ms=10000)
@abstractmethod
def call(self, message, key = None):
...
def listen(self):
print(f"Start Listen kafka {self.__class__.__name__}")
for message in self.consumer:
if message is None: continue
Thread(target=self.call, args=(message.value, message.key)).start()
В этом классе есть метод listen(), который начинает слушать топик Kafka. Таким образом, конструкция SubscribeKafkaTopic[args.trigger].listen() запускает получение сообщений из указанного топика.
Во-вторых, после получения сообщений из указанного топика Kafka, сообщение передаётся в метод call.
Thread(target=self.call, args=(message.value, message.key)).start()
Таким образом, в каждом триггере должна быть реализация метода call. Для того, чтобы реализовать логику обработки сообщений для каждого отдельного триггера
def call(self, message, key = None): ...
Теперь нужно рассмотреть, как обрабатывать различные типы событий, чтобы наш триггер срабатывал только при необходимых нам действиях.
Фильтры событий
Как говорилось ранее, нам доступны следующее события:
r - read - операция чтения данных из таблицы. Возникает в момент подключения Kafka-Connect к таблице.
c - create - операция создания записи, аналог insert
u - update - операция обновления записи
d - delete - операция удаления записи
Зная эти типы событий, нам необходимо сделать так, чтобы в дальнейшем наш триггер выполнялся при одном или нескольких событиях.
Например, у нас есть триггер обновления цены счета после того, как товар в счете был добавлен, изменен или удален. Соответственно, такой триггер должен срабатывать при следующих событиях: c, u, d.
Или другой пример: нам необходимо в момент добавления товара в счет добавлять себестоимость этого товара в таблицу с товарами в счете. Такой триггер уже должен работать только при событии создания (insert), то есть с типом c.
Реализовать такой фильтр по событиям мы решили через декораторы.
@FilterActionType('u', 'c')
def call(self, message, key = None):
...
Таким образом, если тип события не совпадает с типом, который был передан в декоратор, то метод call не сработает.
Скрытый текст
class FilterActionType:
def __init__(self, *actions):
self.actions = actions
def __call__(self, fn):
def call_func(*args, **kwargs):
if args[1]['payload']['op'] in self.actions: return fn(*args, **kwargs)
return False
return call_func
Однако это не всё. Что если триггер должен срабатывать только в том случае, если изменилось какое-то определённое поле либо список определённых полей? И только в том случае, если необходимые нам поля затронуты при изменении, выполнять триггер. Возьмём тот же пример с обновлением цены счета при изменении товара в счёте. Цену счёта нам нужно менять только если изменилось количество товара или изменилась цена товара. Но нам не нужно, чтобы триггер срабатывал, когда, например, изменено описание у этого товара в счёте.
Это мы также сделали через декоратор.
@FilterActionType('u')
@FilterUpdatedRow('Price', 'Quantity')
def call(self, message, key = None): ...
Таким образом, если условие не подойдет хотя бы по одному из декораторов, то метод выполнен не будет.
Скрытый текст
class FilterUpdatedRow:
def __init__(self, *columns):
self.columns = columns
def __call__(self, fn):
def call_func(*args, **kwargs):
for column in self.columns:
if args[1]['payload']['before'][column] != args[1]['payload']['after'][column]:
return fn(*args, **kwargs)
return False
return call_func
А теперь представим такую ситуацию. У нас есть отгрузочный документ, который закрывается, переводится на последнюю стадию, например, с типом "Закрыли". В таком случае нам необходимо проставить дату, когда был закрыт этот документ. При этом важно учитывать, что дату нужно проставлять только у тех документов, которые были закрыты только на складе A. Таким образом, нам нужно реализовать триггер, который сработает при следующих условиях:
Триггер только на событие обновления
Триггер только на изменение поле Status
Триггер только на измене поля Status на значение равное Закрыто и Склад должен быть равен значению A
Что мы имеем сейчас? У нас есть два фильтра, которые помогут нам реализовать два первых пункта. Но что делать с третьим? Правильно, реализовать!
Что мы видим из ТЗ? Нам нужно реализовать фильтр по значению строк. Это достаточно просто, так как у нас из Kafka приходят значения after и before, и нам достаточно проверить, равно ли значение указанного поля (в нашем случае Status = 'Закрыто' и Store = 'A') значению в блоке after. И тут мы можем пойти двумя путями.
Реализовать декоратор отдельно для каждого поля. То есть в каждый отдельный декоратор мы будем передавать проверку на значение каждого поля. И это будет работать, так как если хоть один из декораторов не сработает, метод call не выполнится. Это подходит под логическое "И". Однако стоит предусмотреть тот вариант, что в ТЗ может стоять "ИЛИ", и тогда нам этот вариант не подойдет.
Реализовать декоратор, который принимает тип фильтрации (or или and) и в зависимости от этого фильтрует данные. И мы решили выбрать именно этот путь. Не понятно? Сейчас посмотрим на реализацию, и всё станет понятнее.
Для начала посмотрим, как эта фильтрация будет выглядеть в триггере:
@FilterActionType('u')
@FilterUpdatedRow('Status')
@FilterRowData(
and_(
[
lambda record: record['after']['Status'] == 'Закрыто',
lambda record: record['after']['Store'] == 'A',
]
)
)
def call(self, message, key = None): ...
В @FilterActionType('u') проверяем, что тип события это обновление
В @FilterUpdatedRow('Status') проверяем, что во время обновления было изменено именно значение поля Status
В @FilterRowData реализуем проверку наших значений. Так как в задаче нужно проверить, что Status = 'Закрыто' и Store = 'A', мы реализовали проверку этих условий через and_, в который передаем лямбда-методы по проверке данных. Таким образом, мы можем реализовать любую логику проверки данных, которые мы получаем из Kafka.
Скрытый текст
class BaseFunc:
def __init__(self, filters = []):
self.filters = filters
Скрытый текст
class and_(BaseFunc):
def __call__(self, data):
if not self.filters: return True
result_filters = []
for idx, filter_ in enumerate(self.filters):
if (idx >= 2) & (sum(result_filters) != idx): return False
result_filters.append(int(filter_(data)))
return sum(result_filters) == len(self.filters)
class or_(BaseFunc):
def __call__(self, data):
if not self.filters: return True
for filter_ in self.filters:
if filter_(data): return True
return False
Запуск проверки происходит в декораторе FilterRowData. И если фильтр, который мы передали в конструктор (в данном случае and_), вернёт False, то метод call не отработает.
class FilterRowData:
def __init__(self, filter_func: BaseFunc = None):
self.filter_func = filter_func
def __call__(self, fn):
def call_func(*args, **kwargs):
if self.filter_func is None: return fn(*args, **kwargs)
if self.filter_func(args[1]['payload']): return fn(*args, **kwargs)
return False
return call_func
Итоги
Таким образом, мы реализовали основные фильтры для данных, которые получаем из топиков Kafka, и в каждом из запущенных триггеров можно точечно настраивать фильтры под те данные, которые мы ожидаем получить и с которыми будем работать в данном классе. При этом мы реализовали достаточно простую регистрацию триггеров, что позволяет легко запускать нужный обработчик.
Однако остаются и задачи в бэклоге. Например, нужно сделать так, чтобы одна запись обрабатывалась триггером лишь однажды. То есть, если запись с одним и тем же идентификатором обновлялась несколько раз, то триггер для этой записи должен сработать только один раз. Или же реализовать чтение из базы слейва, а запись в базу мастера, чтобы разгрузить рабочую базу, с которой мы будем работать больше всего.
Комментарии (3)
KING_TRITON
29.08.2024 06:53Отличная статья! Очень подробно и понятно расписаны шаги реализации. Вопрос: как вы решаете вопрос с обработкой дубликатов в Kafka?
dude_sam
Наверное, невнимательно читал и оттого совсем не понял, зачем вам в принципе триггеры?
Если для аудита изменений, то чем не угодили temporal tables?
KainoRhine Автор
У нас на триггерах запускаются бизнес-процессы, которые должны отрабатывать сразу, как только запись появляется (insert), изменяется (update) или удаляется (delete). Ранее все это работало в MSSQL, и триггеры висели на таблицах. Однако со временем таких триггеров стало очень много, и помимо обновления данных в базе нам нужно было отправлять еще и различные уведомления по почте, в Telegram и WhatsApp. Поэтому со временем база стала работать медленнее, так как мы ждали, пока отработают все триггеры при каждой нашей транзакции insert/update/delete. Поэтому мы и сделали такую систему.