Привет! На связи Никита Ильин из Spectr, Backend-разработчик с опытом более 5 лет.
Один из проектов, с которым мы работаем, — IBP-платформа для планирования и прогнозирования спроса и продаж в ритейле. В статье поговорим о конкретной реализации для одной из задач в рамках этой платформы на Python и Django. При этом сама концепция может быть реализована абсолютно на любом фреймворке или платформе: Spring, .NET, Laravel.
Описание кейса
Мы разрабатываем IBP-платформу для крупной корпорации, где на основе данных, которые поступают из смежных систем, строятся прогнозы и аналитика. И одна из областей — работа с огромным количеством файлов из внешних источников: чтение, обработка, загрузка и запись всех этих данных в БД. При этом существует большое количество различных источников и форматов этих файлов.
Глобально задача состоит в том, чтобы осуществить загрузку из внешних источников в единую внутреннюю систему для последующего анализа и прогноза.
Данные: источники и файл
Информация об источниках.
Сейчас в системе около 350 источников. При этом одновременно может поступать до 100 штук новых.
Важное уточнение: один источник — это файл уникальной структуры. Если названия столбцов различаются, то это уже новый файл, новый источник данных.
Информация о файле. Это обычный классический csv-формат. Разделителем может быть либо «;» либо «,». Число колонок — от 5 до 200, но распарсить нужно только количество, которое обозначено в техническом задании. В нашем случае доходило до 40 колонок, все остальное — системные поля. Число строк всегда большое — от 5 млн до 20 млн.
Ниже представлен классический пример файла с условными обозначениями.
Отлично, вся информация о данных у нас на руках! Что же делаем? Тут возможны два пути.
«Решение в лоб»
Первая идея, которая у нас возникла, — статическая реализация или, по-другому, «решение в лоб». При такой реализации мы для каждого источника пишем свой парсер и применяем его — эффективно и быстро. Поговорим о преимуществах чуть подробнее.
Преимущества решения
Быстрая разработка. Раньше уже существовало какое-то количество источников и мы уже знали, как работать по этому пайплайну: без лишних оптимизаций и размышлений. То есть мы приоритезируем источники, отдаем наиболее важные по значимости в первую очередь, а сами занимаемся другими.
Работает надежно, как швейцарские часы. Когда мы пишем парсер на конкретный источник, мы можем написать на него тест. И перед тем как выкатывать, надо посмотреть — а точно ли поведение такое же, как ожидается? Тут мы говорим про стабильность и проверенность того, что наш парсер работает.
Без непредвиденных side-эффектов. Все парсеры строго императивны и изолированы и, благодаря этому, более стабильны. Ведь когда мы говорим про enterprise-разработку, мы точно не хотим, чтобы один наш парсер сломал всю систему.
Пайплайн «решения в лоб»
Сходить на нужный внешний сервер по SFTP.
Именно SFTP, потому что это спецификация нашей задачи, мы так договорились, нам так комфортно общаться между серверами.
Забрать файл.
Применить парсер.
Сохранить в БД.
Этапы написания парсера
Работа аналитика. Сверка с локальной структурой БД и проектирование перевода из внешнего имени во внутреннее. Нам нужно узнать, что, где и как хранится, какая между всем этим связь. Данные формируются в виде спецификации, требований и отдаются разработчику.
Работа Backend-разработчика.
Написание кода для валидации файла и его перевода. То есть это сам код, который осуществляет разбор данных в соответствии с ожидаемой структурой.
Добавление конфигурации в общую структуру парсеров: на какой сервер идем, куда и как данные сохраняем. Мы обобщили все на уровне кода, а теперь надо это явно прописать, куда мы идем, чтобы получить данные о конкретном источнике.
Например, когда у нас SFTP-сервер, я не думаю, что вы будете писать одни и те же четыре строчки кода в каждом парсере. Скорее всего, это будет какая-то функция или класс, которому передаем имя файла и доступы для SFTP-сервера, чтобы он пустил в систему.
Работа тестировщика. Отладка, тесты на деве, стейдже на обрезанных данных. На этом этапе мы выявляем и устраняем ошибки в коде парсера. То есть тестировщик должен проверить две вещи:
работает ли код. То есть он берет маленький кусочек файла, например 100–1000 строчек, и смотрит, встали ли данные в БД, в интерфейс, не свалилось ли что-то и работает ли функционал в целом;
какая скорость. Нужно понять, удовлетворяем ли мы скорости загрузки и нет ли каких-то проблем.
Работа тестировщика и DevOps-инженера. Пуско-наладка на реальных данных. Далее мы все выкатываем на прод, делаем первые итерации, проверяем, все ли работает: встали ли данные в интерфейс, ничего ли не потерялось, выдерживаем ли по ресурсам.
Недостатки решения
Время разработки пропорционально количеству парсеров. Для разработки одного парсера нам нужно 5 дней. Сверка со структурой БД занимает 1 день, написание парсера — 2 дня, на отладку/тесты и пуско-наладку закладываем еще по одномуу дню.
На один парсер — 5 дней, а на 100 — целых 1,5 года. Процесс, конечно, можно оптимизировать и вести параллельно: аналитику не ждать разработчика, а разработчику не ждать тестировщика. Но тем не менее все это в сумме — большой объем очень рутинной работы.
Время доработок пропорционально количеству парсеров. Классический пример: есть 200 источников, при этом появилось требование о том, что столбец — это не целое число, а число с плавающей запятой. И теперь нужно это отвалидировать, чтобы все было в порядке, иначе данные не будут сходиться. А в случае переиспользования кода (DRY) нужно еще приложить усилия к тому, чтобы подумать, как это сделать. К тому же нужно заново пройти по пайплайну. Мы сделали изменения — значит, нам нужно все заново проверить, посмотреть, выкатить и замерить все парсеры.
По итогу формула будет такой: время множественного изменения = время одного изменения * число парсеров. То есть время на расширение числа парсеров или валидируемых ими полей эквивалентно времени разработки. Не получится делать быстрее дублирующуюся работу, придется делать с такой же скоростью.
Расширение числа парсеров или валидируемых ими полей запускает пайплайн заново. Подобная ситуация случилась лично с нами. Изначально было N парсеров, далее убрали 10 и потом добавили еще 15 сверху. А после этого в 20 имеющихся парсерах изменился состав файла и добавился еще один внешний сервер. Приходится начинать все сначала: аналитика –> разработка –> тестирование.
Ключевой вопрос в этой всей ситуации — как преодолеть проблемы «решения в лоб» и сделать результат нашей работы максимально самодостаточным? Мы подумали об этом и пришли к другой идее — динамической реализации.
Динамическая реализация
Давайте вспомним No-code-приложения, например Tilda. Или такой конструктор мобильных приложений, где вы тащите формы, а затем система сама выполняет работу. Код генерируется — вы наслаждаетесь. Примерно то же самое мы сделали в рамках нашего проекта.
В один момент мы подумали: «А что, если разработать пользовательский интерфейс, который позволит самостоятельно решать задачи, минуя аналитика, разработчика и тестировщика?» То есть пользователь сможет сам создать описание для своих действий в админке в виде шаблона. Затем наш магический механизм обработает созданный шаблон. А динамический парсер интерпретирует файлы, соответствующие структуре, описанной в шаблоне, без необходимости дополнительной ручной обработки.
Эту идею мы назвали — шаблон динамического маппинга.
На картинке ниже представлено, как это все можно изобразить с точки зрения пользовательского интерфейса. В этом списке темплейт — наш источник. Мы его назвали шаблоном.
Далее представлено, какие примерно атрибуты могут быть у этого шаблона:
название (name) — на что смотреть в интерфейсе;
внешняя система (external system) — на какой сервер нужно пойти, чтобы достать конкретный файлик (уникальное системное имя шаблона);
имя файлика и директория, в которой он лежит (filepath) — путь к файлу на сервере, с которым связан этот шаблон, и здесь же и имя файла;
системное обозначение для источника (system name) — выбор из внешних систем, куда мы будем подключаться, чтобы туда идти по пути выше.
Дальше мы уже говорим о том, что внутри этого источника. Это находится в отдельной сущности — attributes. Эта сущность включает в себя такие элементы, как:
name — чтобы человеку было на что смотреть в интерфейсе;
system_name — уникальное системное имя поля шаблона, которое мы должны искать в файле;
type — тип данных поля, такие как float, str, str_alpha_numeric, date, int, bool;
field_representations (представления поля) — JSON-структура, представляющая отображение поля на БД;
template (шаблон, Foreign Key) — связь шаблона с общей инфой, к которому относится данное поле.
На каждом этапе присутствует валидация, которая проверяет, например, наличие файлика, полей в файлике, соответствие типу. И, в конечном итоге, это все может записаться в БД.
Сейчас мы построили чисто концептуальное решение. Давайте разберемся, какой результат нам бы принес этот подход — поговорим о его преимуществах.
Преимущества решения
Элемент продукта закончен
Такой формат реализации сокращает все возможные согласования. Вместо написания отдельного парсера для каждого нового файла и его источника создаем шаблоны, описывающие структуру файла и определяющие соответствие полей самостоятельно. Это существенно сокращает процесс работы, к тому же позволяет сразу же проверить результат. И это будет работать уже завтра. Сегодня написал — завтра это уже готово, сегодня придумал — завтра уже на проде валидируешь новый файл.
Настройка и поддержка, которая оптимизирует время
В случае со статической реализацией каждый день на протяжении полутора лет придется заниматься переводом спецификаций в код. Естественно, это не творческая и скучная задача. Мы все-таки хотим закрыть эту задачу и, конечно, сделать это наиболее интересным для нас способом.
При динамической реализации настройка и поддержка будут намного интереснее, чем просто сконструировать «решение в лоб» и сидеть полтора года переводить спецификации.
Возникает ряд вопросов. А что если где-то что-то отвалится? А почему данные не загружаются? А как вообще это все сохранить в табличку и как это все будет выглядеть? А как эти 20 млн строчек обработать? Явно придется над всем этим поразмыслить. 5 часов подумать — 1 час написать код.
Масштабируемость
Появляются новые требования:
добавить в N-количестве источников проверку на дубликаты и действие, которое надо совершать, если они есть;
а еще в M-источниках добавить проверку на отрицательность.
При этом непонятно, будут ли все эти поля в файле.
Так у нас появились дополнительные атрибуты у поля шаблона:
required (обязательное) — флаг, указывающий, является ли поле обязательным для заполнения;
can_be_negative (может быть отрицательным) — флаг, указывающий, может ли поле содержать отрицательные значения;
contained_in_md_table (содержится в таблице md) — имя таблицы md, в которой содержится это поле (если применимо);
contained_in_md_attribute (содержится в атрибуте md) — имя атрибута md, в котором содержится это поле (если применимо);
duplicates_in_table (дубликаты в таблице) — имя таблицы, в которой разрешены дубликаты этого поля (если применимо);
duplicates_attribute (атрибут дубликатов) — атрибут, определяющий дубликаты этого поля (если применимо);
duplicates_action (действие с дубликатами) — выбор из действий по обработке дубликатов: обновление или пропуск.
И на все это есть 5 дней — вспоминаем сроки разработки одного парсера. В такой системе мы на этапе валидации прописываем новые условия один раз, а дальше клиент уже сам работает с шаблонами и сам отвечает за выбранные им параметры.
Этапы реализации динамического решения
Проектируем API
Существует два вида доступов:
admin API (CRUD — в админке) — могут вносить изменения и создавать новые записи;
user API (Read — для всех пользователей) — есть возможность только читать.
Какие инструменты будем использовать:
Python и Django — это решение удобно для нас, к тому же мы используем его с начала работы над проектом.
Blazingly-Fast Polars — о том, почему мы выбрали именно этот инструмент, рассказывал наш тимлид в статье Битва медведей: Pandas против Polars. Если кратко: этот вариант для наших вариантов использования работает быстрее, чем Pandas.
Paramiko — библиотека для подключения по SFTP. Очень красиво и надежно.
SQLAlchemy — в качестве дополнительной ОRМ. Удобный интерфейс, быстро и красиво.
Создаем сущности шаблона и поле шаблона
class Template(models.Model):
name = models.CharField(max_length=255)
system_name = models.CharField(max_length=255, unique=True)
file_path = models.TextField()
external_system = models.CharField (choices=ExternalSystem.choices)
@property
def temporary_table_name(self):
return f"temporary_{self.system_name}"
class Meta:
db_table = f'{TEMPLATE_DB_SCHEMA_NAME}"."templates'
Здесь мы перевели сущность модели в сущность шаблона, которая была до этого. Из интересного — здесь есть функция temporary_table_name, в ней мы получаем temporary-имя. Это название временной таблицы. О том, для чего нам нужна временная таблица, поговорим чуть позже.
Далее то же самое делаем для TemplateField. Можно просто взять, перевести на другой язык — и все готово.
class TemplateField(models.Model):
name = models.CharField(max_length=255)
system_name = models.CharField(max_length=255)
field_representation = models.JSONField()
type = models.CharField(choices=Type.choices)
template = models.ForeignKey(Template, on_delete = models.CASCADE, related_name = "fields",)
class Type(models.TextChoices):
FLOAT = "float"
STR = "str"
STR_ALPHA_NUMERIC = "str_alpha_numeric"
DATE = "date"
INT = "int"
BOOL = "bool"
Пишем Admin API
Как я и говорил, у админов будет полный набор CRUD-операций: чтение, создание, обновление. Но следующий момент более интересный.
path(
"templates",
TemplatesAdminViewSet.as_view(
{
"get": "list",
"post": "create",
}
),
name="admin-templates",
),
path(
"templates/<int:id>",
TemplatesAdminViewSet.as_view(
{
"get": "retrieve",
"put": "update",
"delete": "destroy",
}
),
name="admin-template",
),
Вот эти два поинта ниже нам нужны для того, чтобы у клиента в интерфейсе была возможность посмотреть, какие у нас есть таблички и атрибуты и поля этой таблички.
path(
"templates/utility/db-tables",
TemplatesUtilityAdminViewSet.as_view(
{
"get": "db_tables",
}
),
name="admin-template-utility-db-tables",
),
path(
"templates/utility/columns/
<str:full_model_name>",
TemplatesUtilityAdminViewSet.as_view(
{
"get": "columns",
}
),
name="admin-template-utility-columns",
),
Классические представления — обычные классы доступа. Здесь идет пагинация по страницам. Далее, когда мы предоставляем список шаблонов, мы передаем только основную информацию о них — название. А когда отдаем только один шаблон, мы его отдаем вместе с атрибутивным составом, чтобы пользователь мог убедиться, что у него все правильно загружается.
class TemplatesViewSet(RetrieveModelMixin, GenericViewSet,):
permission_classes = [permissions.IsAuthenticated]
pagination_class = CustomPageNumberPagination
def get_serializer_class(self):
match = self.action:
case "list"
return TemplateSerializer
case "retrieve"
return TemplateWithFieldsSerializer
case :
assert_never(self.action):
Пайплайн загрузки
Загрузка файла
На этом этапе мы получаем какой-то путь файла у этого темплейта, то есть не пишем явно — идти на такой-то сервер. А просто говорим — взять у темплейта название файла. И на выходе получается путь файла.
file_path = template.file
Далее через библиотеку мы подключаемся по SFTP, забираем файл.
transport = paramiko.Transport ((SFTP_HOST,SFTP_PORT))
transport.connect (None,SFTP_USERNAME,SFTP_PASSWORD)
with paramiko.SFTPClient.from_transport(transport) as sftp_client:
paramiko_file = sftp_client.open(file_path,"rb")
Валидация файла
У темплейта есть филды — template.fields. Мы их забираем — это и будут наши правила валидации.
django_file = InMemoryUploadedFile
validate_file(django_file, list(template.fields.all()))
Этапы валидации
BasicFileHeadersValidator. В процессе валидации проверяем наличие данных, дублирование колонок, наличие требуемых колонок. Если какой-то из этих пунктов не проходит, мы отправляем пользователю ошибку, так как нам незачем загружать файл, у которого нет требуемых нам колонок. Мы знаем, что он заведомо сохранит его туда, куда нам не надо.
BasicFileReaderValidator / GeneralValuesFieldsValidator. Базовое чтение (проверка на кодировку, соответствие строк размеру), перевод названий колонок согласно шаблону, нормализация данных и проверка их на соответствие типам, генерация дата-фрейма.
Особенности работы с большими файлами
Даже если взять 10 млн строк, с учетом того, что параллельно работает 100 источников, cкорее всего, они упадут по памяти.
for chunk in file.chunks():
Что с этим делать? Классический вариант — разбиение на чанки. В результате этого получается кусок файла, c которым можно продолжать работать. Далее с ним проводим соответствующие этапы валидации и формируем соответствующий дата-фрейм. Но в такой структуре важно, чтобы мы не захотели сделать дополнительную логику — например, агрегацию, дезагрегацию, суммирование, фильтрацию, категоризацию. Для всех этих работ мы используем сохранение во временную таблицу, а потом при необходимости все это сохраняем в основную БД.
Если требование состоит в том, что решение нужно кастомное, то это делает уже другой разработчик в рамках другой задачи. Он идет во временную таблицу, забирает все нужные данные и, соответственно, с ними делает то, что нужно ему. При этом временную таблицу нужно каждый раз чистить перед загрузкой, иначе вы упадете по памяти в БД или будете работать со старыми данными.
Сохранение во временную таблицу
Что будем использовать дальше? Во-первых, SQLAlchemy. Собираем из дата-фрейма название колонок, делаем сущности колонок, берем название таблицы и с помощью контекстного менеджера вставляем какое-то количество записей. В нашем случае — 1 млн.
def save_to_db(table_name: str, data: pl.DataFrame) -> None:
columns = [column(column_name) for column_name in data.columns]
db_table = table(table_name, *columns, schema=TEMPLATE_DB_SCHEMA_NAME)
with (
create_sa_engine_to_django_db() as session,
Session(engine) as session,
session.begin(),
):
session.execute(
insert(db_table),
data.rows(named=True),
)
Что такое контекстный менеджер
from contextlib import contextmanager
@contextmanager
def create_sa_engine_to_django_db():
...
В Python можно использовать удобный декоратор для этого паттерна. Перед началом мы напишем такой connection_url, где мы вставляем доступ к БД. Здесь конструкция try-finally говорит о том, что мы пытаемся отдать наш движок подключения к БД. Но в любом случае, какие бы ошибки ни были, этот движок будет в конечном итоге закрываться. Это нужно для того, чтобы в БД не висело открытое подключение.
from django.db import connection
from sqlalchemy.engine import URL
connection_url = URL.create(
"postgresql",
connection.settings_dict["USER"],
connection.settings_dict["PASSWORD"],
connection.settings_dict["HOST"],
connection.settings_dict["PORT"],
database=connection.settings_dict["NAME"],
from sqlalchemy import create_engine
engine = create_engine(connection_url, future=True)
try:
yield engine
finally:
engine.dispose()
Сохранение в основную таблицу.
Берем нужную нам модель представления данных, написанную на Django.
from django.apps import apps
django_model = apps.get_model(model_info[0])
Здесь есть специфичный для Django код. Но я почти уверен, что таким образом можно получить табличку из основной БД на любом другом языке.
model_fields: dict = model_info[1]
if "id" in model_fields:
primary_key_name: int = "id"
elif "external id" in model_fields:
primary_key_name: str = "external id"
else:
primary_key_name: str = "name"
Дальше мы смотрим поля у этой таблицы и пытаемся найти ее первичный ключ. Для нашего проекта специфично то, что может быть три разных первичных ключа, в зависимости от таблички: обычный ID, External ID либо Name ID.
value_filters = set()
for record in self.data.select(dataframe_fields).iter_rows(named=True):
primary_key_value = record.get(model_fields.get(primary_key_name))
value_filters.add(primary_key_value)
Итак, мы получили название первичного ключа. Далее мы идем в БД и проверяем, какие записи с этими ID уже есть.
Мы берем их, кладем в оперативную память.
existing_records = django_model.objects.filter(
**{f"{primary_key_name}_in: value_filters}
).first()
existing_records = {getattr(row, primary_key_name): row for row in existing_records}
После этого мы идем по дата-фрейму и проверяем, есть ли у нас такая запись.
Если не смогли найти с этим ID объект среди тех, что мы вытащили из БД, то сохраняем ее как новый объект, кладем в какую-то структуру (в нашем случае список) и затем позже создадим их в БД.
for record in self. _data.select(dataframe_fields).iter_rows(named=True):
primary_key_value = record.get(model_fields.get(primary_key_name))
existing_record = existing_records.get(primary_key_value)
if not existing_record:
content = {
field_name: objects.get(model_fields.get(field_name))
for field_name in model_fields.keys():
}
instances_to_create.append(django_model(**content))
Но если мы все-таки смогли найти этот ID, то берем и меняем у найденного объекта все атрибуты.
else:
for field_name in model_fields.keys():
if field_name == primary_key_name:
continue
setattr(
existing_record,
field_name,
record.get(model_fields.get(field_name)),
)
instances_to_update.append(existing_record)
Условно, в файле одно значение, в БД другое значение для этой строчки — поменяли, сохранили в структуру. И здесь происходит множественное сохранение/обновление. Важно использовать batch_size. Потому что создавать запрос на миллион строчек — невыгодно. Делаем batch_size, разбиваем структуру на множество запросов и делаем с ним.
django_model.objects.bulk_create(
instances_to_create,
batch_size=1000)
django_model.objects.bulk_update(
instances_to_update,
fields=fields_to_update,
batch_size=1000)
batch_size=1000 — число объектов в каждом запросе.
Заклинание освоено! А теперь поговорим о недостатках такого решения.
Недостатки динамического решения
Трудности в отладке и тестировании. Это интересно, но приходится думать, тратить время, пытаться понять, где ошибка, почему не можем распарсить — затратно дебажить.
Строгость в соблюдении принципов. Важно соблюдать всю последовательность действий, про которую мы говорили ранее (этапы реализации динамического решения). Мы не должны запихнуть весь файл в систему, а на выходе говорить, что сделаем полностью кастомную работу.
Ограничение в функциональности. Здесь как раз идет речь об отсутствии некоторых кастомных полей, все динамическое. И если вы тронете хотя бы одну строчку, то для всех остальных это изменение автоматически применится. Один файл, но его тяжело поддерживать.
Дополнительное время на валидацию. Многие знают, что функции в Python вызывать довольно дорого, поэтому вы можете столкнуться с тем, что валидация может занимать много времени. Ни в коем случае не пытайтесь запустить код, который кажется примерно рабочим. Посмотрите, какие есть практики использования функции, почему нельзя использовать лямбду-функцию в цикле и т. д.
Дополнительное время на сохранение во временную БД и/или основную БД. Мы говорим, что у нас есть временная таблица, и после нее вы можете делать некоторые кастомные операции, которые вы хотите. Но в то же время мы тратим минуту-две на то, чтобы сохранить эти данные, и, плюс ко всему, они занимают какую-то дополнительную память на диске.
Дальнейшие шаги
Как мы можем обойти эти минусы:
Расширение поддерживаемых форматов файлов. Здесь имеются в виду Excel и все подобные структуры. Нам не потребуется писать отдельный парсер для каждого такого формата. Можно просто добавить новый вариант ридера, добавить его в общую структуру и смотреть на расширение файла перед загрузкой.
Оптимизация производительности. Пытаемся ускорить валидацию, оптимизировав все возможные куски, меняя пайплайн вызова валидаторов.
Развитие интерфейсов и возможных конфигураций. Админу нужно дать возможность делать batch_size для источника. То есть он знает, что, например, у него будет 100 валидированных колонок, и здесь batch_size в 100 тыс. — это слишком много, а 1,5 тыс. — уже нормально. Пусть у него будет возможность заменить на любое значение, которое он сам посчитает нужным. Дальше, если говорить о других возможных интерфейсах, у нас есть проверка на отрицательность на случай дублирования (мы меняем эту строчку или пропускаем ее и говорим, что нам все равно, есть ли она).
И на этом все! В статье я поделился своим опытом, основанным на решении конкретной задачи. Надеюсь, что материал поможет вам выбрать правильное решение в аналогичной ситуации и покажет, как можно творчески подходить к задачам. А если у вас появятся вопросы — буду рад на них ответить в комментариях к этой статье!
Статья подготовлена по мотивам доклада Никиты Ильина, Backend-разработчика в Spectr, на митапе #DevTalks. Ссылка на запись доклада:
reasoned_critique
спасибо, кейс годный. надо будет ознакомится с вашим youtube