В этой статье я расскажу вам, как использовать DataFusion для создания собственного опыта разработки баз данных.

Базы данных являются одними из самых сложных частей программного обеспечения, задуманных с момента появления вычислительной эры более полувека назад. [1] Почти каждая часть технологии в конечном итоге касается базы данных в той или иной форме. Несмотря на повсеместность баз данных в программном стеке, подавляющее большинство разработчиков были приучены относиться к базам данных как к более или менее черному ящику — сложным плотным чарам программного обеспечения, созданным волшебниками и знатоками, укрывшимися в элитных рядах компаний, занимающихся базами данных, или в таких местах, как Google. Поговорка для остальных из нас, как она есть, — никогда не пытайтесь написать свою собственную базу данных .

Тем не менее, несмотря на их долговечность, мы наблюдаем постоянные инновации в этой области, которая впервые началась с появлением Hadoop около 2 десятилетий назад. На сайте ClickBench теперь перечислено более 50 баз данных в его наборе тестов [2]. И это только аналитические движки. С учетом последних тенденций переписывания всех больших данных на Rust [3] не проходит и месяца, чтобы интересный новый проект не оказался в тренде Hacker News. В этой статье мы рассмотрим, насколько легко (или сложно) создавать базы данных с помощью Apache Datafusion и можете ли вы, будучи простым смертным, на самом деле реально создать собственную базу данных и внедрить инновации вокруг опыта разработчика.

Большинство современных баз данных можно разбить на вычислительные и хранилищные слои, со сложными механизмами запросов, отвечающими за «вычислительную» часть базы данных. Механизм запросов обычно состоит из анализатора запросов, генерации логического плана и затем генерации физического плана для запуска вычислений на механизме выполнения. Запрос обычно проходит через несколько фаз оптимизации при генерации логического плана, а также генерации физического плана. Независимо от того, каков целевой вариант использования конечной системы, механизм запросов более или менее следует этой модели.

Учитывая десятилетия исследований баз данных, которые были проведены в каждом из этих отдельных слоев, планка для написания функционального движка запросов с функциями ставок таблиц остается поразительно высокой. И вам нужно прибить все это, прежде чем вы сможете приступить к написанию функций, специфичных для вашего варианта использования. Хотя существует множество проектов, которые помогают вам писать некоторые из этих слоев по отдельности, Apache DataFusion остается единственной игрой в городе, которая помогает вам со всем спектром.

Вы можете подумать о DataFusion extensible database development toolkit. На самом базовом уровне вы можете использовать его как механизм запросов a la DuckDB с его встроенными фронтендами SQL и Dataframe, в то же время вы можете расширять или даже полностью заменять различные слои, чтобы полностью построить свой собственный опыт.

В оставшейся части этой статьи мы рассмотрим, как расширить DataFusion, добавив собственные операторы в его механизм выполнения, а затем пропустить его через Физический и Логический планировщики и представить его на внешнем интерфейсе.

Создание топового DataFusion

Архитектура DataFusion

В Denormalized мы создаем Duck DB, как одноузловой опыт для приложений потоковой обработки. Хотя DataFusion и поддерживает неограниченные вычисления, у него нет потокового оконного оператора. Windows лежит в основе приложений потоковой обработки, они предоставляют простой способ объединения бесконечных потоков данных в конечные блоки, чтобы мы могли применять к ним агрегации.

Для этого урока мы реализуем простой оконный оператор для бесконечных потоков. Наш оператор будет иметь следующую сигнатуру --

pub fn window( self group_expr: Vec<Expr>, aggr_expr: Vec<Expr>, window_length: Duration, slide: Option<Duration>, ) -> Result<Self> { ... }


Написание плана выполнения

ExecutionPlan представляет собой узел в физическом плане DataFusion. Это то, куда будет помещен фактический код с нашими пользовательскими вычислениями. Модель выполнения DataFusions основана на pull, что означает, что выполнение начинается с приемников и продвигается вверх по физическому плану. Вызов метода execute для этого признака создает асинхронный SendableRecordBatchStream пакетов записей путем инкрементного получения раздела выходных данных путем выполнения вычислений над входными данными Execution Plan.

В нашем случае использования метод execute() ExecutionPlan возвращает struct GroupedWindowAggStream , который реализует RecordBatchStream, обертку вокруг свойства futures::Stream. Фактические вычисления должны быть реализованы в poll_next() реализации Stream.

impl RecordBatchStream for GroupedWindowAggStream { fn schema(&self) -> SchemaRef { self.schema.clone() } } impl Stream for GroupedWindowAggStream { type Item = Result<RecordBatch>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let poll = self.poll_next_inner(cx); // Code to compute the record batch goes here. ... }


В нашем примере вызов poll_next_inner для окна потоковой передачи решает эту проблему.

  1. Обработка входящих данных.

  2. Накопление рядов в открытых окнах.

  3. Обновление водяного знака (который находится за мьютексом)

  4. Открытие новых окон при необходимости.

  5. Закрытие всех запускающих окон и создание из них выходных RecordBatches.

    Подключение к Physical Planner

    Создав наш собственный Execution Plan, нам нужно сообщить Physical Planner о его существовании. Реализация ExtensionPlanner для нашего ExtensionPlan — это все, что нам нужно сделать здесь.

    Расширение логического плана

    Теперь, когда у нас есть реализованный вместе с ExtensionPlanner пользовательский план выполнения, нам нужно добавить сопутствующий узел в логический план. Это не только позволяет нам выставлять его на фронтенды SQL/DataFrame, но и подключаться к логическим оптимизаторам для оптимизаций, таких как выталкивание предикатов.

    В DataFusion нам сначала необходимо реализовать определяемый пользователем узел логического плана , а затем добавить LogicalPlanExtension в конструктор логического плана, который предоставляет его интерфейсам SQL/DataFrame.

    Логический план к физическому плану

    Последняя часть головоломки — это точка соприкосновения, где логический план преобразуется в физический план. Для этого мы реализуем пользовательский QueryPlanner , который гарантирует, что физический планировщик инициализируется с пользовательскими расширениями, которые мы написали для нашего ExecutionPlan.

    Пользовательские правила оптимизации

    Поскольку наш оператор реализовал группу путем агрегации, нам нужно гарантировать, что все строки для определенной группы окажутся в одном разделе. Для этого мы добавим новое физическое правило оптимизации , чтобы добавить оператор HashPartition к нашим групповым ключам.

    Собираем все вместе

    Наконец, все, что нам нужно сделать, это создать сеанс DataFusion с помощью пользовательского QueryPlanner, который мы написали выше, а также с дополнительным правилом физического оптимизатора, которое мы добавили, и вуаля, теперь у нас есть собственная расширенная версия DataFusion.

    let state = SessionStateBuilder::new() .with_default_features() .with_config(config) .with_query_planner(Arc::new(StreamingQueryPlanner {})) .with_optimizer_rules(get_default_optimizer_rules()).with_physical_optimizer_rule(Arc::new(EnsureHashPartititionOnGroupByForStreamingAggregates::new(), )) .build();

Комментарии (0)