Можно было бы назвать эту статью "Yet another analytical database", если бы не тот факт, что Apache Doris построен на архитектуре MPP, которая изначально ориентирована на параллельные вычисления и использование распределенного хранения и обработки данных на кластерах. Изначально проект Baidu, инструмент позволяет подготавливать аналитические панели с обновлением в реальном времени, при этом источниками данных могут быть как потоки из внешних источников (логи событий, time series-данные), так и источники из Data Lake (например, Apache Iceberg или Hive). В этой статье мы рассмотрим основные моменты использования Apache Doris на простом примере хранения и простой обработки данных о погоде.

Apache Doris представляет собой комбинацию Backend-хранилища и Frontend-обработчиков запросов, при этом каждый из уровней может быть реализован несколькими узлами для отказоустойчивости и балансировки нагрузки. 

Для внешнего доступа используется протокол, совместимый с MySQL, что позволяет использовать клиентские библиотеки и инструменты, разработанные для MySQL / MariaDB без необходимости модификации. Внутри Apache Doris представляет собой колоночное хранилище и использует индексы для ускорения поиска. Модель данных также представляется в виде таблиц с типизированными столбцами.

При поступлении запроса frontend сервер планирует его выполнение на кластере backend-узлов, при этом сама реализация backend использует возможности SIMD и многозадачности в пределах одного процессора, что позволяет получить очень высокую производительность даже на небольшом кластере. Запрос декомпозируется на несколько этапов, каждый из которых может принимать на вход вектор значений от предыдущего и выполнять обработку параллельно с получением результатов предшествующего шага.

При установке frontend настраиваются два типа узлом: follower (будет участвовать в выполнении операций, один из них выбирается leader) и observer (не участвует непосредственно в определении плана операции, но нужен для обеспечения кворума для голосования). Backend будет использовать дисковое пространство для хранения данных (создаются три копии каждого блока данных). 

Для тестирования будет достаточно установить один frontend-сервер и один backend. Также для взаимодействия с внешними источниками данных нужно установить процесс брокера. 

Сначала запустим frontend-сервер. Зададим переменную DORIS_HOME:

export DORIS_HOME=$HOME/.doris/

и создадим каталог для хранения данных:

mkdir $DORIS_HOME/doris-meta

Архив Doris может быть получен с этой страницы. Распакуем архив и запустим:

bin/start_fe.sh --daemon

Создадим теперь каталог для backend:

mkdir -p be/storage

и запустим backend-сервер:

bin/start_be.sh --daemon

Установим mysql client, подключимся к frontend-серверу и зарегистрируем backend-сервер:

./mysql-client -h localhost -P 9030 -uroot
ALTER SYSTEM ADD BACKEND "127.0.0.1:9050";

Также запустим брокер и зарегистрируем его на frontend:

bin/start_broker.sh --daemon
./mysql-client -h localhost -P 9030 -uroot
ALTER SYSTEM ADD BROKER weather "127.0.0.1:8000";

Наиболее важные отличия становятся заметны при создании таблицы. Описание столбцов данных совместимо с mysql, но кроме обычных столбцов могут быть добавлены столбцы агрегации (min/max/sum/replace), которые будут накапливать значения при обновлении базы. Например, столбец с типом replace будет заменять значение, sum - добавлять к текущему значению, min/max - актуализировать минимальное и максимальное значение. Например, мы можем накапливать наименьшее и наибольшее значение температуры за время наблюдения и дату последнего наблюдения. Создадим новую базу данных и таблицу:

CREATE DATABASE example;

CREATE TABLE IF NOT EXISTS weather.weather
(
    `measurement_id` LARGEINT NOT NULL COMMENT "user id",
    `date` DATE NOT NULL COMMENT "data import time",
    `city` VARCHAR(20) COMMENT "city",
    `temperature` FLOAT COMMENT "temperature",
    `last_measurement` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "last visit date time",
    `min_temperature` FLOAT MIN DEFAULT "100" COMMENT "minimum temperature"
    `max_temperature` FLOAT MAX DEFAULT "0" COMMENT "maximum temperature"
)
AGGREGATE KEY(`measurement_id`, `date`, `city`, `temperature`)
DISTRIBUTED BY HASH(`measurement_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

Для импорта данных будем использовать csv-файл, для этого сначала создадим пользователя, а затем передадим файл в брокер:

CREATE USER test IDENTIFIED BY 'test';
GRANT ALL ON example TO test;
curl --location-trusted -u test:test -H "label:initial" -H "column_separator:," -T weather_data http://localhost:8030/api/example/weather/_stream_load

Файл должен содержать данные для всех столбцов и разделяться в этом случае через запятую:

1,2022-11-13,St.Petersburg,2.7,2022-11-14,2.7,2.7
2,2022-11-14,St.Petersburg,2.3,2022-11-14,2.3,2.3

Для импорта также могут использоваться следующие источники:

  • загрузка из JSON (также используется _stream_load с заголовком format: json)

  • подписка на Apache Kafka (используется CREATE ROUTINE LOAD name ON table FROM KAFKA(config)

  • подписка на внешние таблицы CREATE EXTERNAL RESOURCE name PROPERTIES (type="...", host="...", ...)

  • загрузка через брокер: LOAD LABEL name WITH BROKER name (config)

  • подписка на изменения базы mysql (через Change Data Capture)

  • Spark ETL для преобразования поточных данных

Заполненные таблицы могут быть отправлены через брокер во внешнее хранилище (например, HDFS) через EXPORT TABLE ... TO ..., также можно отправить результаты запроса SELECT ... INTO OUTFILE "hdfs://..." (или в локальный файл со схемой file://)

Для интеграции внешних источников данных можно использовать инструменты для ETL (например, StreamX), ElasticSearch - CREATE TABLE ... ENGINE=ELASTICSEARCH PROPERTIES (...). логи из Logstash,

Для визуализации актуальных значений базы данных (или данных агрегации) можно использовать Grafana и плагин для источника данных в MySQL, Metabase с поддержкой MySQL, Dashboard Builder и многие другие инструменты, поскольку Doris полностью имитирует протокол MySQL и поддерживает SQL99 и некоторые расширенные команды MySQL. 

Также в Doris поддерживаются функции по работе с массивами значений (могут быть сохранены в столбце), поддерживаются операции с геоданными, агрегации по группам значений, также поддерживаются математические, статистические функции, шифрование и хэширование, аналитические функции для групп последовательных значений (скользящее окно). 

Проект представляет особый интерес благодаря возможностям интеграции с различными источниками данных, при этом все операции выполняются с максимально возможной скоростью с поддержкой MPP и распределенных вычислений. Сейчас проект Apache Doris имеет статус Top-Level Project в Apache и последовательно расширяет список плагинов для импорта-экспорта данных, а поддержка MySQL протокола позволяет использовать привычные инструменты визуализации уже сейчас.

Статья подготовлена в преддверии старта курса "Базы Данных" от OTUS. Также приглашаю всех желающих на бесплатный вебинар по теме: "Разбор типовых SQL-задач с собеседований для веб-разработчиков".

Комментарии (0)