Автор работал в различных дата-инженерных проектах и иногда проекты представляют собой набор модулей без логики и без общего подхода. Поэтому цель статьи - разработать этот общий подход и заодно поупражняться вместе с читателем в его создании.

Подводящие идеи

В Enterprise мы работаем с трансформациями данных, поэтому: 

  1. Краеугольный камень - это бизнес-процесс, некий Flow, который преобразует одну или несколько таблиц. 

  2. Таблица - это вспомогательная сущность, обслуживающая бизнес. Попытка сделать таблицы основными сущностями приведет к размыванию бизнес-логики.

  3. Потоки и таблицы удобно реализовывать как классы.

  4. Так как бизнес-логика может быть сложной, её разумно разбить на шаги (steps) — функции.

Важно: за классом таблицы может стоять что угодно — например, Spark DataFrame. В этой статье будем ориентироваться именно на Spark.

Основные идеи подхода

  1. Создать базовый класс Flow, в котором будет реализована логика фреймворка

  2. Описывать конкретные пайплайны через наследование:

    class MyFlow(Flow):    
      ...
  3. Разбивать процесс на шаги — методы класса, которые можно переиспользовать

  4. Делать описание шагов через декораторы

Подход - Class-based pipeline orchestration на декораторах

Рассмотрим пример описания. У нас есть поток MyFlow, унаследованный от Flow, который мы опишем далее, пока это не важно. И есть одна функция, для которой мы делаем описание, что это: шаг потока, что она принимает на вход таблицу MyTable, и выходом является таблица MyTable2:

class MyFlow(Flow):
    @classmethod
    @Flow.step()
    @Flow.input([MyTable])
    @Flow.output(MyTable2)
    def step_one(cls, context: Context) -> DataFrame:
        print(f" Step 1: создаём/обновляем MyTable,{context.id}")

Итого: у нас есть один шаг step_one. Что делают декораторы:

  • @Flow.step() — помечает метод как шаг pipeline

  • @Flow.input([MyTable]) — описывает входные данные

  • @Flow.output(MyTable2) — описывает результат

Важно: сами декораторы не выполняют логику, они только добавляют мета-информацию.

Context — контейнер состояния

Наша функция-шаг имеет на входе  некий  context:Context и возвращаемое значение DataFrame - это стандартный тип спарка для табличек. Что такое Context?

Тут надо решить - как мы будем работать с аргументами. Лично мне нравится некий обобщенный вид, когда не надо писать длинные списки параметров. Это имеет свои плюсы и минусы, решать вам. Но для фреймворка это имеет смысл.

Возможная реализация:

from pydantic import BaseModel
class Context(BaseModel):
    # допустим глобальный конфиг
    config: Dict[Type[Config], Config] = {}
    # текущие таблички, с которыми работаем
    data: Dict[Union[str, Type[Table]], DataFrame] = {}
    # какие-то другие переменные
    diff: Dict[Any, Any] = {}

Плюсы такого подхода:

  • меньше аргументов в функциях

  • легко расширять

  • единая точка передачи состояния

Минусы:

  • менее явные зависимости

Таблицы

В Spark схема описывается через StructType:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("item", StringType(), nullable=False),
    StructField("loc", StringType(), nullable=False),
    StructField("qty", IntegerType(), nullable=True)
])

df = spark.createDataFrame([], schema=schema)

Часто для описания табличек используется SQLAlchemy, поэтому может быть хорошей идеей использовать именно его, и только конвертировать это описание. Будем использовать декларативную нотацию версий 1.x, 

from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

class Table(Base):
    __abstract__  = True  # чтобы не создавать таблицу для этого класса

class MyTable(Table):
    __tablename__  = "my_table"
    id = Column(Integer, primary_key=True, nullable=True)
    name = Column(String(50), nullable=True)
    age = Column(Integer, nullable=True)

Объявляем табличку с 3 колонками, первая - primary key

Теперь надо определить класс Table, чтобы получать описание в spark-стиле, для этого нам нужен конвертер

def _convert_type(sqlalchemy_type) -> T.DataType:
    # Упрощённая карта типов
   
    if isinstance(sqlalchemy_type, Integer):
        return T.IntegerType()
    if isinstance(sqlalchemy_type, String):
        return T.StringType()
    if isinstance(sqlalchemy_type, Float):
        return T.FloatType()
    if isinstance(sqlalchemy_type, Boolean):
        return T.BooleanType()
    if isinstance(sqlalchemy_type, Date):
        return T.DateType()
    if isinstance(sqlalchemy_type, DateTime):
        return T.TimestampType()
    if isinstance(sqlalchemy_type, Numeric):
        return T.DoubleType()

    raise ValueError(f"Unsupported type: {sqlalchemy_type}")


class Table(Base):
    __abstract__ = True  # чтобы не создавать таблицу для этого класса

    @classmethod
    def get_schema(cls) -> T.StructType:
        fields = []
        for column in cls.__table__.columns:
            spark_type = _convert_type(column.type)
            fields.append(
                T.StructField(column.name, spark_type, column.nullable)
            )
        return T.StructType(fields)

можно использовать код вида MyTable.get_schema(), чтобы достать описание таблицы в спарк-стиле. Естественно можем использовать полиморфизм и делать свои кастомные таблички

Можно полностью переопределить метод:

class MyTable(Table):
    __tablename__ = "my_table"

    id = Column(Integer)
    name = Column(String)

    @classmethod
    def get_schema(cls) -> T.StructType:
        return T.StructType([
            T.StructField("id", T.LongType(), False),
            T.StructField("name", T.StringType(), True),
            T.StructField("extra_col", T.StringType(), True),
        ])

 здесь мы напрямую возвращаем StructType. А можно использовать дефолтную логику и добавлять/менять поля поверх неё.

class MyExtendedTable(Table):
    __tablename__ = "ext"

    id = Column(Integer)
    name = Column(String)

    @classmethod
    def get_schema(cls):
      
        # возьмем SQLAlchemy-версию
        schema = super().get_schema()   

        # добавим вычисляемую колонку
        fields = schema.fields + [
            T.StructField("sys_load_ts", T.TimestampType(), False)
        ]

        return T.StructType(fields)

Крупноблочно мы описали, теперь займемся его реализацией. Основа фреймворка - декораторы, давайте немного вспомним теорию. Конструкция:

@classmethod
@Flow.step(order=1)
def step_one(cls, context):
    pass

Разворачивается так:

  1. Flow.step(order=1) → возвращает декоратор

  2. декоратор применяется к функции

  3. результат оборачивается в classmethod

Важно: порядок применения — снизу вверх

Основные идеи:

  1. Сами декораторы только навешивают некую внутреннюю мета-информацию _step_meta на функцию, в которой будем определять порядок, таблицы и т.д.

  2. Сборка пайплайна в специальном методе __init_subclass__, запускается в последний момент, после всех декораторов.

  3. Ключевая идея — собрать шаги автоматически при создании класса.

Напомню, что __init_subclass__ это специальный метод введенный в Python 3.6 (PEP 487). Это позволяет базовому классу настраивать инициализацию своего класса.

from functools import wraps

class Flow:
    def __init_subclass__(cls):
        # 1) переопределяем переменные для каждого подкласса, важно!
        cls.steps = []
        cls.step_meta = {}
        # 2)  обходим все атрибуты класса, ищем наши шаги
        for name, attr in cls.__dict__.items():
            func = None
            if isinstance(attr, classmethod):
                func = attr.__func__
            elif callable(attr):
                func = attr
            # 3) добавляем мета-инфорцацию
            if func and hasattr(func, "_step_meta"):
                cls.steps.append(name)
                cls.step_meta[name] = func._step_meta
        # 4) и напоследок сортируем по order
        cls.steps.sort(key=lambda n: cls.step_meta[n]["order"])

Плюс:

  • каждый Flow получает свой список шагов

  • нет смешивания состояния между классами

Определим наконец наш декоратор, который превращает функцию в шаг. Основная наша задача - добавить мета-информацию на функцию

@classmethod
def step(cls, order):
    def decorator(func):
        func._step_meta = getattr(func, "_step_meta", {})
        func._step_meta["order"] = order
        return func
    return decorator

И запуск реализуем так - проходимся по всем шагам, получаем функцию по имени шага, и запускаем ее:

@classmethod
def run(cls, context):
    print(f"\n=== RUN FLOW {cls.__name__} ===")
    for step in cls.steps:
        print("META:", step, cls.step_meta[step])
        # получаем функцию и вызываем ее с context
        getattr(cls, step)(context)

Файл конфигурации

Используем pydantic. Для работы нам нужно наследоваться от класса , который реализует всю магию.

from pydantic import BaseModel

class Config(BaseModel):
    param1: str = "A"

Плюсы:

  • валидация типов

  • автодополнение в IDE

  • удобная сериализация

Удобно использовать yaml-формат для параметром, рекомендую использовать его, функции для сохранения и загрузки в yaml конфигурационного файла: 

def save_config(config: Config, filepath: str):
    yaml.dump(config.dict(), open(filepath, 'w'))

def load_config(filepath: str) -> Config:
    return Config(**yaml.safe_load(open(filepath)))

SQL как основной инструмент трансформаций

Идея:

  • сначала SQL

  • если не хватает — DataFrame API

Плюсы:

  • быстрее писать

  • понятнее аналитикам

Для спарка, как мы знаем, есть возможность выполнять непосредственно SQL

df = spark.range(10)
df.createOrReplaceTempView("my_view")
df = spark.sql("select id, 1 as asd, 2 as ewq from my_view")
df.show()

Основная идея - отдельный шаг, в котором либо автоматически, либо мы сами вызовем некоторую функцию, которая загрузит параметризованный SQL, подставит параметры и выполнит.

class MyFlow(Flow):

    @classmethod
    @Flow.step(order=3)
    @Flow.input([MyTable])
    @Flow.output(AnotherTable)
    # Либо декоратор для исполнения файла
    @Flow.sql("step_three.sql")
    def step_three(cls, context: Context):
        context.data[MyTable].show()
        # либо сами вызываем 
        df = cls.execute_sql(context, "step_three.sql",vars={"id":1})
        df.show()

И сам файл для выполнения step_three.sql:

select * from my_table
where id={{id}}

Первое, что нам нужно - это декоратор, в котором мы только передаем мета-информацию в функцию

@classmethod
def sql(cls, file: str, vars: Dict[str, Any] = None):
    """
    Вешается на шаг:
    @Flow.sql("step1.sql", vars={"x":1})
    """
    def decorator(func):
        func._step_meta = getattr(func, "_step_meta", {})
        func._step_meta["sql"] = {
            "file": file,
            "vars": vars
        }
        return func
    return decorator

Понадобится вспомогательная функция создания временных view:

def camel_to_snake(name: ...) -> str:
    """
    Приведения названия таблицы в нужный вид.    
    Пример:
        Abc -> abc
    """
    
@classmethod
def create_temp_views(cls, context: Context, tables: List[Any]):
    """
    Создаёт временные spark view  для SQL
    """
    for table_class in tables:
        df = context.data.get(table_class, None)
        table_name = table_class if isinstance(table_class, str) else camel_to_snake(table_class.__name__)
        if df is not None:    
            df.createOrReplaceTempView(table_name)

Функция исполнения sql-кода из файла, используем подход jinja2 для подстановки переменных

from jinja2 import Template

@classmethod
def execute_sql(cls, context: Context, *, file: str, vars: Dict[str, Any]):
    # читаем файл 
    sql_query = cls.read_sql(file)
    # Подставляем переменные
    if vars:
        sql_query = Template(sql_query).render(vars)

    result_df = context.spark.sql(sql_query)
    return result_df

И блок исполнения теперь выглядит так:

@classmethod
    def run(cls, context:Context):
        print(f"\n=== RUN FLOW {cls.__name__} ===")
        for i, step in enumerate(cls.steps):
            print("META:", step, cls.step_meta[step], i)
            cls.current_step = i
           
            meta = cls.step_meta[step]
            # ---------- SQL ----------
            if "sql" in meta:
                sql_meta = meta["sql"]
 
                cls.create_temp_views(context, meta["input"])
        
                result = cls.execute_sql(
                    context,
                    file=sql_meta["file"],
                    vars=sql_meta.get("vars")
                )
                result.show()
                # сохраняем результат
                result_table =  meta["output"][0]
                context.data[result_table] = result

            # Выполняем шаг
            getattr(cls, step)(context)

Итог

Мы получили простой декларативный фреймворк:

  • Flow = бизнес-процесс

  • Step = шаг трансформации

  • Context = состояние

  • Таблицы = декларативные модели

При этом:

  • структура единообразная

  • легко расширять

Что дальше

В статье разобраны базовые идеи. Возможные улучшения:

  • dependency graph вместо order

  • валидация входов/выходов

  • retry и error handling

  • execution backend (Spark / SQL / Pandas)

  • реализация через дескрипторы вместо __init_subclass__

Немного про dependency graph

Текущая реализация использует явный порядок выполнения шагов (order). Однако в более зрелых системах orchestration используется dependency graph (DAG), где шаги определяются через зависимости друг от друга. Это позволяет:

  • автоматически вычислять порядок выполнения

  • запускать независимые шаги параллельно

  • упростить поддержку пайплайна

Можно будет делать так:

@Flow.step(depends_on=["step_one", "step_two"])
def step_three(cls, context):
    ...

Комментарии (7)


  1. levge
    18.04.2026 17:29

    Интересно, спасибо!

    Но вот это немного смущает

    «primary_key=True, nullable=True)»

    Ключ присутствовать должен всегда, иначе зачем он


    1. eager_igor Автор
      18.04.2026 17:29

      Согласен, крипово) но ладно, это только пример)


  1. Ninil
    18.04.2026 17:29

    Декларативный подход имеет очень много неявных ограничений если его применять везде. Эти ограничения каждое по отдельности не критично в каждом конкретном случае, но в enterprise-grade платформе данных накапливаются и в какой то момент, по достижении критической массы становится очень больно.

    Ну и стоить трансформации на каскадах temp view … Это верх непрофессионализма дата инженера.

    …иногда проекты представляют собой набор модулей без логики и без общего подхода…

    Это проблема организационная и управленческо-архитектурная. И она не решается придумыванием (N+1)-го подхода.


    1. eager_igor Автор
      18.04.2026 17:29

      Конечно, каждый подход имеет плюсы/минусы. Насчет N+1 - мне все же кажется наоборот это плюс, можно выбирать, комбинировать и т.д. Насчет каскадов - да, спасибо, что подсветили, в статье не упоминал этого. Конечно, надо как-то materialization делать, абсолютно согласен, для каких-то шагов, декларативно или ручками


      1. Ninil
        18.04.2026 17:29

        Для промежуточных шагов не нужна никакая materialization. Архитектура дата-слоев должна оставаться чистой и не зависеть от того, как и чем вы эти слои заполняете. Каким бы маркетинговым термином ее не назвали, есть по большому счету всего 3 слоя: stage - core - marts (последний иногда ещё делят на два подслоя: базовые и прикладные витрины, есть в этом есть реальная потребность),и материлизоваться должны только объекты этих слоев. Широкое использование каскадов промежуточных объектов говорит о нехватки компетенций команды, непродуманой архитектуре или бездумному использованию какого-то фреймворка (например dbt, DLT и т.п.).


        1. eager_igor Автор
          18.04.2026 17:29

          Нет, тут нет привязки шагов к слоям\архитектурам, шаг - квант трансформаций и все, Есть ооочень сложные трансформации/вычисления, поэтому и разбивка на шаги и возможность их переиспользования


          1. Ninil
            18.04.2026 17:29

            я указыыаю на то, что все это можно (и нужно) делать без создания промежуточных (временных) объектов. Особенно если используете Spark. Если какой-то шаг предполагается для переиспользования - это отлично ложится в концепт “базовых и прикладных витрин”. Но если у вас появляется “многослойный пирог” из “промежуточных переиспользуемых объектов” - это повод задуматься о гапах в аналитике и архитектуре. В 90% двух слоев витрин: базовые (переиспользуемые) и прикладные (финальная аналитика) - хватает за глаза. Оставшиеся 9% решаются просто добавлением некоторых зависимостей между прикладными (что тоже нормально). Ну и 1% - очень сложные случаи, которые очень редки.