Привет! Меня зовут Сергей Климов, я работаю Python-разработчиком, специализируюсь на написании бэкенда на фреймворке Django. Идея данной статьи появилась из-за желания написать простой и понятный ETL-процесс без итерации по спискам и прочей “вложенности”, попутно решая вопросы рационального использования ОЗУ. Уверен, что мои наработки будут полезны для питонистов, которые столкнулись с аналогичной задачей.
Зачем это надо
Для начала обозначим предметную область. Нас интересует ETL-процесс (extract, transform, search) реализованный через паттерн “Цепочка обязанностей”. Мы разработаем в качестве примера три обработчика, которые будут передавать данные последовательно из функции в функцию. Каждый последующий обработчик решает, может ли он обработать запрос сам и стоит ли передавать запрос дальше по цепи.
Перед тем, как начать
Каждый наш обработчик будет представлять из себя функцию, которая будет являться вызывающей функцией и/или функцией-генератором.
В первом приближении наша задача создать итератор, который будет состоять из этих функций. Это мы реализуем посредством декоратора.
В вызывающем (или передающем) коде передача данных в следующий обработчик будет через метод send(). Принимать данные будем через выражение yield.
Также мы будем использовать распаковку кортежей, сопоставление match/case.
Поэтому рекомендую ознакомиться/повторить данные темы и термины, чтобы понимать как всё будет работать.
Постановка задачи
Вымышленная задачка, главное “погонять” данные из функции в функцию.
В базе данных есть таблица, содержащая целые числа. ETL-процесс должен пройтись по всем записям таблицы, возвести каждое число в квадрат и отобразить в консоли. Для каждого четного числа вывести информационное сообщение "the square of an even number". Если число из базы данных равно 3, то никаких действий над ним мы не делаем.
Первая функция
Задача первой функции “цепочки обязанностей” - передать значение в генератор путем вызова метода send(). В нашем вымышленном примере мы делаем sql-запрос к таблице, содержащей числовые строки и по одной передаем их в генератор.
import psycopg2
from psycopg2.extras import DictCursor
SQL = """select id, number from etl.source"""
def extract(batch):
dbs = dict(dbname='demo', user='sergei', password='sergei', host='localhost')
with psycopg2.connect(**dbs) as connection:
with connection.cursor(cursor_factory=DictCursor) as cursor:
cursor.execute(SQL)
record = cursor.fetchone() # можно использовать fetchmany, чтобы извлекать данные "пачками"
while record:
batch.send(record)
record = cursor.fetchone() # можно использовать fetchmany, чтобы извлекать данные "пачками"
Декоратор
Теперь пишем обертку для наших функций-генераторов для обеспечения возможности принимать данные. Задача - запустить генератор. Для этого мы используем next() встроенную функция Python, которая возвращает следующий элемент в итераторе.
from functools import wraps
from functools import wraps
def coroutine(func):
@wraps(func)
def inner(*args, **kwargs):
fn = func(*args, **kwargs)
next(fn)
return fn
return inner
Вторая и все промежуточные функции
У всех промежуточных функций “цепочки обязанностей” три задачи - принять данные, обработать их и передать в следующую функцию. В продолжении нашего вымышленного примера:
@coroutine
def transform(batch):
while record := (yield):
new_number = record["number"] ** 2
if record["number"] % 2 == 0:
foo = "an even number"
elif record["number"] == 3:
print("skip load stage")
continue
else:
foo = 0
batch.send((new_number, foo))
Обратите внимание, что yield в скобках предполагает, что будет получен итерируемый объект. Иначе вы словите исключение StopItearation. Ветками if/elif/else показано, что можно управлять наборами данных, которые будут направлены на следующий этап. А также через continue можно вообще прервать выполнение “цепочки обязанностей”.
Заключительная функция
Технологически это функция из предыдущего раздела только без инструкции send. Т.е. мы ничего далее не отправляем. Завершаем наш вымышленную цепочку:
@coroutine
def load():
while subject := (yield):
match subject:
case (int(number), str(bar)):
print("the square of", bar, number)
case (int(number), int(bar)):
print(number)
case _:
raise SyntaxError(f"Unknown structure of {subject=}")
Собираем все вместе
Ключевое слово “собираем”, а не “запускаем”. Вот как выглядит код запуска наших функций:
extract(
transform(
load()
)
)
Т.е. порядок вложенности (читаем снаружи внутрь) регулирует порядок расположения функций в итераторе. При рефакторинге кода получаем:
unloads = load()
multiplication = transform(unloads)
extract(multiplication)
Это несколько сбивает с толку, т.к. запуск кода по факту идёт “снизу вверх”. Имейте это ввиду, т.к. общепринят именно второй вариант кода. Иначе никак, просто представьте себе как будет выглядеть первый вариант кода если количество этапов будет более пяти.
Репозиторий с кодом доступен по ссылке.
Что далее
При первом знакомстве с кодом возникает ощущение его асинхронности (спасибо декоратору с говорящим названием coroutine). Но это не так. Все действия выполняются последовательно. В следующий раз мы попытаемся добавить асинхронности в наш код.
Желаю успехов и с удовольствием отвечу на ваши вопросы.
MentalBlood
Если я правильно понял задачу, можно еще через reduce:
SergeyKlimov_Creator Автор
Надо попробовать...