Оптимизация запросов в ClickHouse с помощью создания цепочки материализованных представлений

В ClickHouse материализованные представления (materialized views) являются механизмом, автоматически выполняющим запросы к исходным таблицам при поступлении новых данных. 

Материализованное представление (МП) - это специальный тип таблицы, содержащей результат выполнения запроса к исходным данным. Этот результат фактически представляет собой кэшированное представление данных из исходных таблиц. Одной из ключевых особенностей МП в ClickHouse является их автоматическое обновление. При поступлении новых данных в исходные таблицы МП обновляется, автоматически пересчитываясь в соответствии с определенным запросом.

Таким образом, материализованные представления в ClickHouse предоставляют удобный способ поддерживать актуальные агрегированные данные, позволяя автоматически обновлять результаты запросов при изменениях в исходной информации. Это особенно полезно для выполнения запросов с высокой степенью вычислений или агрегации данных, таких как отчеты, статистика и аналитика.

Приведенная ниже диаграмма наглядно демонстрирует, как это работает:

Введенные строки (Inserted rows)Преобразованные строки (Transformed rows)Материализованное представление (Materialized View)Итоговая (агрегированная) таблица  (Destination Table)Исходная таблица (Source Table)
Введенные строки (Inserted rows)
Преобразованные строки (Transformed rows)
Материализованное представление (Materialized View)
Итоговая (агрегированная) таблица  (Destination Table)
Исходная таблица (Source Table)

Последние пару недель я разбирался с агрегатными состояниями. В качестве демонстрации мною были созданы два материализованных представления, получающих данные из одного и того же табличного движка Kafka (Kafka Source Table). В одном хранились необработанные ("сырые") данные о событиях (Raw Events), а в другом - агрегатные состояния (Aggregation State).

Когда я показал пример Тому, он предложил такой вариант: вместо того, чтобы оба представления считывали данные непосредственно из таблицы Kafka, можно было бы связать МП в виде цепочки. Выходная таблица одного представления становится входной для другого.

Этот подход может быть полезен в случаях, когда необходимо выполнить несколько преобразований данных перед записью в конечную таблицу. Например, первое материализованное представление может выполнять фильтрацию данных, а второе - группировку и агрегацию. Таким образом, можно создать более гибкую и эффективную систему обработки данных.

Ниже приведена схема, которую он имел в виду:

Другими словами, вместо того, чтобы материализованное представление агрегированного состояния (Aggregation State Materialized View) читало данные из таблицы Kafka (Kafka Source Table), я должен сделать так, чтобы оно читало данные из “сырых” событий, которые уже были из нее извлечены (Raw Events Table).

Далее в статье мы рассмотрим практический пример цепочки МП. Для этого будем использовать ленту последних изменений в Wiki, которая предоставляет поток событий, отражающих изменения, внесенные в различные ресурсы Wikimedia (какие страницы были изменены, кем и когда это произошло, а также описание самого изменения). Эти данные доступны в формате событий на стороне сервера. Свойство data примерного сообщения показано ниже:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
    "request_id": "ccbbbe2c-6e1b-4bb7-99cb-317b64cbd5dc",
    "id": "41c73232-5922-4484-82f3-34d45f22ee7a",
    "dt": "2024-03-26T09:13:09Z",
    "domain": "en.wiktionary.org",
    "stream": "mediawiki.recentchange",
    "topic": "eqiad.mediawiki.recentchange",
    "partition": 0,
    "offset": 4974797626
  },
  "id": 117636935,
  "type": "edit",
  "namespace": 0,
  "title": "MP3播放器",
  "title_url": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
  "comment": "clean up some labels; add missing space after *; {{zh-noun}} -> {{head|zh|noun}}, {{zh-hanzi}} -> {{head|zh|hanzi}} per [[WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun]], [[WT:RFDO#Template:zh-noun]]; fix some lang codes (manually assisted)",
  "timestamp": 1711444389,
  "user": "WingerBot",
  "bot": true,
  "notify_url": "https://en.wiktionary.org/w/index.php?diff=78597416&oldid=50133194&rcid=117636935",
  "minor": true,
  "patrolled": true,
  "length": {
    "old": 229,
    "new": 234
  },
  "revision": {
    "old": 50133194,
    "new": 78597416
  },
  "server_url": "https://en.wiktionary.org",
  "server_name": "en.wiktionary.org",
  "server_script_path": "/w",
  "wiki": "enwiktionary",
  "parsedcomment": "clean up some labels; add missing space after *; {{zh-noun}} -&gt; {{head|zh|noun}}, {{zh-hanzi}} -&gt; {{head|zh|hanzi}} per <a href=\"/wiki/Wiktionary:RFDO#All_templates_in_Category:Chinese_headword-line_templates_except_Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun</a>, <a href=\"/wiki/Wiktionary:RFDO#Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#Template:zh-noun</a>; fix some lang codes (manually assisted)"
}

Представим, что мы разрабатываем дашборд  для отслеживания вносимых изменений. Нас не интересуют отдельные правки, мы хотим поминутно отслеживать уникальное количество пользователей, вносящих изменения, уникальное количество изменяемых страниц и общее количество внесенных изменений.

Начнем с создания и последующего использования базы данных wiki:

CREATE DATABASE wiki;
USE wiki;

Создание движка таблицы Kafka

Далее давайте создадим таблицу wikiQueue, которая будет получать сообщения из Kafka-темы wiki_events, используя локальный Kafka брокер, запущенный на порту 9092.

Обратите внимание, что если вы используете ClickHouse Cloud, то для обработки поступающих из Kafka данных вам придется использовать ClickPipes.

CREATE TABLE wikiQueue(
    id UInt32,
    type String,
    title String,
    title_url String,
    comment String,
    timestamp UInt64,
    user String,
    bot Boolean,
    server_url String,
    server_name String,
    wiki String,
    meta Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
  'localhost:9092', 
  'wiki_events', 
  'consumer-group-wiki', 
  'JSONEachRow'
);

Таблица rawEvents содержит информацию о дате и времени события (dateTime), URL-адресе названия статьи (title_url), теме события (topic) и пользователе, внесшем изменения (user).

CREATE TABLE rawEvents (
    dateTime DateTime64(3, 'UTC'),
    title_url String,
    topic String,
    user String
) 
ENGINE = MergeTree 
ORDER BY dateTime;

Далее мы создадим следующее материализованное представление для записи данных в rawEvents

CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents AS 
SELECT toDateTime(timestamp) AS dateTime,
       title_url, 
       tupleElement(meta, 'topic') AS topic, 
       user
FROM wikiQueue
WHERE title_url <> '';

В данном случае МП будет использоваться для записи “сырых” данных событий, извлеченных из потока последних изменений в Wiki, в таблицу rawEvents. Это позволит нам сохранить эти данные в структурированном виде и использовать их для дальнейшего анализа.

Мы используем функцию toDateTime для преобразования временной метки в секундах эпохи (точка отсчета для измерения времени. Обычно это 00:00:00 UTC 1 января 1970 года) в объект DateTime (который представляет собой дату и время в удобном для работы формате). Мы также используем функцию tupleElement для извлечения свойства topic из объекта meta.

Хранение агрегатных состояний

Далее создадим таблицу для хранения агрегатных состояний, чтобы обеспечить инкрементную агрегацию. Агрегатные состояния хранятся в столбце с типом AggregateFunction(<aggregationType>, <dataType>).

При ведении уникального подсчета строковых значений String в датасете, необходимых для отслеживания уникальных пользователей и уникальных страниц, мы используем тип AggregateFunction(uniq, String). Таким образом, здесь применяется специальная функция агрегации. Например, если у нас есть логи пользователей или журнал посещений веб-страниц, мы можем выяснить, сколько уникальных пользователей посетило наш сайт или сколько уникальных страниц было посещено за определенный период времени.

Чтобы вести непрерывный подсчет изменений, который нам необходим для общего обновления данных, используем тип AggregateFunction(sum, UInt32). Этот тип данных UInt32 обозначает беззнаковое целое число размером 32 бита, что дает нам максимальное значение 4294967295. Это намного больше, чем количество обновлений, которые мы получим за одну минуту. Таким образом, мы можем применить эту функцию агрегации для непрерывного отслеживания суммарного количества обновлений данных за определенный период времени, не беспокоясь о достижении максимального значения.

Назовем эту таблицу byMinute. Ее определение приведено ниже:

CREATE TABLE byMinute
(
    dateTime DateTime64(3, 'UTC') NOT NULL,
    users AggregateFunction(uniq, String),
    pages AggregateFunction(uniq, String),
    updates AggregateFunction(sum, UInt32) 
)
ENGINE = AggregatingMergeTree() 
ORDER BY dateTime;

Материализованное представление, заполняющее эту таблицу, будет считывать данные из rawEvents и использовать комбинаторы -State для извлечения промежуточного состояния. Мы будем использовать функцию uniqState для подсчета уникальных пользователей и страниц, а также функцию sumState для подсчета изменений.

CREATE MATERIALIZED VIEW byMinute_mv TO byMinute AS 
SELECT toStartOfMinute(dateTime) AS dateTime,
       uniqState(user) as users,
       uniqState(title_url) as pages,
       sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;

На диаграмме ниже показана цепочка материализованных представлений и таблиц, которые мы создали на данный момент:

В настоящее время в системе отсутствуют данные, которые должны быть переданы или записаны в систему Kafka. Для того чтобы начать работу с этой таблицей или с данными в системе, необходимо выполнить определенные команды или действия, которые позволят запустить поток информации в Kafka. Как только это будет сделано, данные начнут поступать в таблицу, и мы сможем использовать их для различных целей, таких как анализ, обработка или хранение.

Давайте это осуществим, выполнив следующие команды.

curl -N https://stream.wikimedia.org/v2/stream/recentchange  |
awk '/^data: /{gsub(/^data: /, ""); print}' |
jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t wiki_events -Kø

Благодаря этой команде мы извлекаем свойство data из ленты последних изменений, создаем пару key:value с помощью jq, а затем передаем ее в Kafka посредством kcat.

Можно оставить эту команду работать некоторое время, для сбора достаточного количества данных. Затем написать запрос, чтобы увидеть, сколько изменений было внесено в ресурсы Wikimedia. Это может быть полезным для мониторинга активности в Wiki и анализа изменений, внесенных в различные ресурсы.

SELECT
    dateTime AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
    ┌────────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:53:00.000 │   248 │   755 │    1002 │
 2. │ 2024-03-26 15:52:00.000 │   429 │  1481 │    2164 │
 3. │ 2024-03-26 15:51:00.000 │   406 │  1417 │    2159 │
 4. │ 2024-03-26 15:50:00.000 │   392 │  1240 │    1843 │
 5. │ 2024-03-26 15:49:00.000 │   418 │  1346 │    1910 │
 6. │ 2024-03-26 15:48:00.000 │   422 │  1388 │    1867 │
 7. │ 2024-03-26 15:47:00.000 │   423 │  1449 │    2015 │
 8. │ 2024-03-26 15:46:00.000 │   409 │  1420 │    1933 │
 9. │ 2024-03-26 15:45:00.000 │   402 │  1348 │    1824 │
10. │ 2024-03-26 15:44:00.000 │   432 │  1642 │    2142 │
    └─────────────────────────┴───────┴───────┴─────────┘

Похоже, что все работает хорошо.

Добавление еще одного МП в цепочку

Мы запустили эту систему и накопили определенное количество данных, которые были записаны в таблицу byMinute с интервалом в 1 минуту. Теперь, после выполнения данной операции в течение некоторого времени, было бы полезно сгруппировать и разбить собранные данные на 10-минутные, а не только 1-минутные бакеты. Это можно сделать, написав следующий запрос к таблице byMinute:

SELECT
    toStartOfTenMinutes(dateTime) AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;

В результате получится нечто вроде этого, где значения в столбце dateTime будут теперь располагаться с шагом в 10 минут.

    ┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:50:00 │   977 │  4432 │    7168 │
 2. │ 2024-03-26 15:40:00 │  1970 │ 12372 │   20555 │
 3. │ 2024-03-26 15:30:00 │  1998 │ 11673 │   20043 │
 4. │ 2024-03-26 15:20:00 │  1981 │ 12051 │   20026 │
 5. │ 2024-03-26 15:10:00 │  1996 │ 11793 │   19392 │
 6. │ 2024-03-26 15:00:00 │  2092 │ 12778 │   20649 │
 7. │ 2024-03-26 14:50:00 │  2062 │ 12893 │   20465 │
 8. │ 2024-03-26 14:40:00 │  2028 │ 12798 │   20873 │
 9. │ 2024-03-26 14:30:00 │  2020 │ 12169 │   20364 │
10. │ 2024-03-26 14:20:00 │  2077 │ 11929 │   19797 │
    └─────────────────────┴───────┴───────┴─────────┘

Это хорошо работает с небольшим объемом информации, но когда приходится обрабатывать большие массивы, может понадобиться еще одна таблица, в которой будут храниться данные, разбитые на 10-минутные интервалы. Давайте создадим такую таблицу:

CREATE TABLE byTenMinutes
(
    dateTime DateTime64(3, 'UTC') NOT NULL,
    users AggregateFunction(uniq, String),
    pages AggregateFunction(uniq, String),
    updates AggregateFunction(sum, UInt32) 
)
ENGINE = AggregatingMergeTree() 
ORDER BY dateTime;

Далее создадим МП для заполнения этой таблицы. МП будет запрашивать таблицу byMinute, применяя запрос, подобный тому, который мы использовали для вычисления 10-минутных интервалов выше. Единственное отличие заключается в том, что вместо комбинаторов -Merge нам нужно использовать комбинаторы -MergeState, чтобы вернуть состояние агрегации данных byMinute, а не базовый результат.

Теоретически мы сможем сэкономить время вычислений, поскольку МП byMinute уже агрегировало данные в одноминутные бакеты. Теперь, вместо агрегирования “сырых” данных по секундам с нуля в 10-минутные интервалы, мы используем одноминутные бакеты.

Материализованное представление представлено ниже:

CREATE MATERIALIZED VIEW byTenMinutes_mv TO byTenMinutes AS
SELECT toStartOfMinute(dateTime) AS dateTime,
       uniqMergeState(users) as users,
       uniqMergeState(pages) as pages,
       sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;

На следующей диаграмме показана цепочка материализованных представлений, которую мы создали:

Если мы запросим данные из таблицы byTenMinutes, то не увидим в ней никакой информации, потому что она еще не была заполнена. Когда таблица byTenMinutes начнет заполняться, она будет собирать только новые данные, которые будут поступать в таблицу byMinute. Это означает, что старые данные, которые уже накопились в byMinute, не будут автоматически перенесены в таблицу byTenMinutes.

Однако все не так плохо, как кажется. Можно написать запрос, чтобы восстановить прежние данные. Он будет выполнять функцию обратной загрузки (backfill) данных, то есть заполнит byTenMinutes старыми данными из таблицы byMinute:

INSERT INTO byTenMinutes 
SELECT toStartOfTenMinutes(dateTime),
       uniqMergeState(users) AS users, uniqMergeState(pages) AS pages,
       sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;

Это позволит нам получить полную картину данных, как новых, так и старых, в таблице byTenMinutes.

Затем мы можем составить следующий запрос к byTenMinutes, чтобы вернуть данные, сгруппированные по 10-минутным бакетам, которые применим для дальнейшего анализа или визуализации:

SELECT
    dateTime AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byTenMinutes
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;

Мы получим те же результаты, что и при запросе таблицы byMinute:

    ┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:50:00 │   977 │  4432 │    7168 │
 2. │ 2024-03-26 15:40:00 │  1970 │ 12372 │   20555 │
 3. │ 2024-03-26 15:30:00 │  1998 │ 11673 │   20043 │
 4. │ 2024-03-26 15:20:00 │  1981 │ 12051 │   20026 │
 5. │ 2024-03-26 15:10:00 │  1996 │ 11793 │   19392 │
 6. │ 2024-03-26 15:00:00 │  2092 │ 12778 │   20649 │
 7. │ 2024-03-26 14:50:00 │  2062 │ 12893 │   20465 │
 8. │ 2024-03-26 14:40:00 │  2028 │ 12798 │   20873 │
 9. │ 2024-03-26 14:30:00 │  2020 │ 12169 │   20364 │
10. │ 2024-03-26 14:20:00 │  2077 │ 11929 │   19797 │
    └─────────────────────┴───────┴───────┴─────────┘

Всё о работе с ClickHouse — от установки и настройки до продовых решений — можно узнать на онлайн-курсе в OTUS «ClickHouse для инженеров и архитекторов БД».

Комментарии (0)