Привет, дорогой друг. Меня зовут Максим и я data-инженер в одной из прекрасных команд AI area компании Домклик. Правильно перевозить байтики с места на место нам помогает огромный зоопарк инструментов. И, кажется, мы приручили ещё одного питомца.

В этой статье хочу поделиться сценарием, который, на мой взгляд, прекрасно описывает вариант использования компактной встраиваемой базы данных DuckDB. Возможно, очевидные вещи, которые будут тут, покажутся вам гениальными или вы просто погладите утёнка и попробуете его в деле.

Коллега, внедривший функциональность, не хотел рассказывать, как утёнок решил часть проблем, с которыми мы столкнулись, но я с ним договорился, и ниже мы опишем, как утёнок живёт сейчас.

Оглавление

Введение

В прошлой статье я упоминал, что основной потребитель данных в наших процессах — это модели. Data scientist-ы обучают модели в Dag‑ах Airflow и зачастую обрабатывают немалые объёмы. А ещё они любят проводить много экспериментов. Особо это ощущается в рекомендательных системах. У нас много моделей, и каждая потребляет необходимый ей объём серверных ресурсов CPU, GPU и RAM, и, конечно же, он не резиновый. Хотелось бы его оптимально использовать, а также не мешать другим командам обучать свои модели. В одном из случаев у нас получилось это сделать с помощью того самого питомца.

Не будем разбирать, кто такой DuckDB, что у него под капотом, какой диалект для работы с ним используется и есть ли удобный API. У инструмента очень хорошая документация, огромное сообщество, оставлю внизу полезные ссылки. Звёзды на GitHub говорят сами за себя, и стоит только упомянуть о том, что Марк Расвельдт и Ханнес Мюлейзен анонсировали, что утёнок стал совсем взрослым и опубликовали релиз stable‑версии, на текущий момент уже 1.1.0.

Итак, перейдём к тому какие боли мы ощутили и как их пытались решать.

Проблемы, которые решали

  • Переиспользуемость данных (большое количество DAG-ов скачивают каждый день одни те же данные).

  • Долгое извлечение данных (скачивание занимает время).

  • Неоптимальная загрузка (все данные собираются в памяти, сериализуются, удваивая потребляемую память).

  • Разнородность данных (Clickstream, Service data, Dictionary, Ml‑features, CSV, Parquet и др.).

  • Тестирование (нет удобного интерфейса для работы с данными).

Варианты решений

До того как пришёл к нам утёнок, мы перекладывали данные Clickstreаm-a в оперативную память и лопали их в DAG‑ах. Например, данные из ClickHouse нужны для работы в нескольких DAG‑ах, и да, вы правильно поняли: каждый DAG лопатил один и тот же объём, каждый день выгружая его в оперативную память. Со временем поняли, что такой подход требует много времени на загрузку данных при обучении. Более того, запросы по объёмам данных от моделей росли и останавливаться не собирались. DAG‑ам становилось всё сложнее и сложнее, они начали периодически падать по OOM, а инструменты не менялись, и мы ушли обдумывать эту проблему.

Что смотрели и смотрим:

  • S3 и Parquet;

  • PV и DuckDB;

  • Compute in DB;

  • Iceberg и Trino;

S3 и Parquet: с отдельным хранением необходимых данных в формате Parquet в S3.

Достоинства:

  • Данные скачиваем один раз и затем переиспользуем, что позволяет скачивать быстро файлы, в сравнении с регулярной загрузкой Clickstreаm-a в память с последующей обработкой.

Недостатки:

  • Всё также долгое скачивание.

  • Переиспользуемость. При масштабировании, если нужен будет другой набор полей или размер окна, возможно, придётся готовить несколько файлов — долго, неочевидная схема данных.

  • Сложно поддерживать.

  • Инкрементально обновлять Parquet — нет.

PV и DuckDB: работа с данные через DuckDB в Persistent Volume (PV).

Достоинства:

  • Инкрементальное добавление новых данных сколь угодно малыми порциями, с возможностью партицирования.

  • Получение данных потребителями: удобный интерфейс в виде диалекта SQL, поддержка выгрузки в Dataframe и другие форматы.

  • Быстрое считывание данных из Persistent Volume в память.

  • Быстрое сохранение на диск, в том числе из pandas.DataFrame.

  • Понятная, контролируемая схема данных, наличие миграций.

Недостатки:

  • Нельзя читать и писать параллельно в разных процессах. Множественное подключение — только с read_only = True. Придётся разносить по времени запись от задач чтения.

Compute in DB: например, подключение внешних источников к ClickHouse и подготовка данных в DB.

Достоинства:

  • Сопоставление и вычисление данных в одном месте.

  • Лёгкое подключение внешних данных.

Недостатки:

  • Проблемы с операцией join.

  • Поддержание жизнеспособности ClickHouse при расчётах.

Iceberg и Trino (а может и DuckDB): вычисление и хранение всех данных под Iceberg-ом.

Текущий вариант рассматривается в рамках будущего исследования, так как ведутся работы по развёртыванию инфраструктуры.

..in progress

PV и DuckDB

А сейчас давай посмотрим, как выглядит домик утёнка, и как тот работает, когда из него выходит. Используем Persistent Volume Claim — сущность k8s для управления хранилищем кластера. В настройках DAG‑а монтируем существующий Persistent Volume (для соответствующего пространства имён). Используем путь примонтированного тома в наших task‑ах для работы с файлами DuckDB.

Рассмотрим на примере логики работы с данными из Clickhouse:

  • Сделать первоначальную загрузку данных за нужный период в PV и S3.

  • Обновлять данные инкрементально: дописывать новые и удалять старые в конце окна (в нашем случае необходимы данные за 6 месяцев). Репозиторий утёнка как раз содержит все эти методы для поддержания актуальности данных:

import datetime as dt
from pathlib import Path

import duckdb

import pandas as pd

from dag.config import DATE_FORMAT


class DuckDB:
    DB_FILE_NAME = Path("clickhouse_actions")
    ACTIONS_TABLE_NAME = "actions"

    def __init__(self, db_directory: Path):
        self.db_file_path = db_directory / self.DB_FILE_NAME

    def query(self, query: str) -> duckdb.DuckDBPyRelation:
        with duckdb.connect(str(self.db_file_path)) as con:
            return con.sql(query)

    def fetchone(self, query: str) -> tuple | None:
        with duckdb.connect(str(self.db_file_path)) as con:
            return con.sql(query).fetchone()

    def insert_data(self, data: pd.DataFrame) -> duckdb.DuckDBPyRelation:
        return self.query(
            f"""INSERT INTO {self.ACTIONS_TABLE_NAME} SELECT * FROM data"""
        )

    def remove_old_actions(self, depth_dt: dt.datetime):
        return self.query(
            f"""
            DELETE FROM {self.ACTIONS_TABLE_NAME}
            WHERE timestamp <= '{depth_dt.strftime(DATE_FORMAT)}'
            """
        )

    def fetch_latest_action_dt(self) -> dt.datetime:
        result = self.fetchone(
            f"""SELECT timestamp FROM {self.ACTIONS_TABLE_NAME}
            ORDER BY timestamp DESC LIMIT 1"""
        )
        if not result:
            raise ValueError("Empty table.")
        return result[0].astimezone()

    def remove_db_file(self) -> None:
        self.db_file_path.unlink(missing_ok=True)

После загрузки данных в оба домика утёнка можно без проблем подключаться к S3 и PV для работы с обновлёнными данными из ClickHouse. Но стоит помнить, что если есть одно подключение к файлу DuckDB на запись, то больше никто подключиться к нему не сможет. Поэтому если писать нет необходимости, то для конкурентного подключения лучше выбрать только режим read_only = True.

Да, вы правильно прочитали — домика два. Если утёнок увидит, что с его Persistent Volume что‑то случилось или ему просто нужно переехать в другой, то он знает, что все его данные дополнительно сохраняются в S3 для аналитики и тестового обучения моделей. Кстати, основной домик Persistent Volume выглядит в DAG‑ах вот так:

from airflow import DAG
from kubernetes.client import models as k8s

NAMESPACE = "неймспейс"
NAME_VOLUME = "домик_утёнка"
VOLUME_MOUNT_PATH = "/app/mount"

VOLUMES = [
    k8s.V1Volume(
        name=NAME_VOLUME,
        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
            claim_name=NAME_VOLUME
        ),
    ),
]
VOLUME_MOUNTS = [
    k8s.V1VolumeMount(
        mount_path=VOLUME_MOUNT_PATH,
        name=NAME_VOLUME,
        sub_path=None,
        read_only=False,
    ),
]

dag = DAG(
    # ...
)

kof = KubeOpFactory.open(
    dag=dag,
    namespace=NAMESPACE,
    default_image="IMG-URL",
    common_params={
        "volume_mount_path": VOLUME_MOUNT_PATH,
    },
    # ...
)

extract_all_actions = kof.get_operator(
    task_id="extract_all_actions",
    volumes=VOLUMES,
    volume_mounts=VOLUME_MOUNTS,
)

Кажется, мы получили кеш на стероидах! Если взять время, которое DAG‑и тратили раньше на экспорт данных, условно 30 минут, и загрузку файла DuckDB из Persistent Volume с уже подготовленными и отфильтрованными данными — всего за пару минут, то с помощью DuckDB можно эффективно использовать ресурсы по времени и меньше конкурировать за них с другими командами.

Заключение

Вот мы подошли к концу сказа. Он был небольшим по сравнению с тем объёмом, который обрабатывает DuckDB. В итоге получили необычное решение, которое позволило без расточительства использовать наши инфраструктурные ресурсы, ускорило загрузку и извлечение данных. Переиспользуемость позволила удобно проводить тесты и собирать наборы данных для моделей.

Конечно же, всё не может быть идеально, мы не решили ряд других проблем, связанных с объёмом RAM, который нужен для обучения моделей в моменте. Этим вопросом мы продолжили заниматься и, возможно, расскажем в будущем ещё что‑то интересное.

Всем спасибо за уделённое время! Буду рад обсудить эту тему в комментариях. Всем добра!

Кстати, DuckDB называется так потому, потому что у Ханнеса, руководителя проекта, был домашний питомец — утка по имени Уилбур. А также разработчики считают, что утки выносливы и могут жить за счёт чего угодно, подобно тому, как они представляли себе работу своей базы.

Полезные ссылки

Комментарии (0)