В этой статье будут описаны подводные камни, на которые я натыкался при использовании одновременно материализованных представлений и движка ReplacingMergeTree в ClickHouse. Для опытных пользователей ClickHouse эта информация возможно будет уже не новой, но надеюсь, что смогу сэкономить много времени тем, кто недавно начал свое знакомство или только готовится начать.

Это первая часть, в которой опишу основные термины: что такое материализованные представления и ReplacingMergeTree, как работают и какие есть особенности.

Материализованные представления

Материализованное представление (далее МП) это по сути такая же таблица, которая хранит данные, взятые их другой таблицы. ClickHouse (далее CH) следит за вставкой данных в таблице-источнике и помещает их в МП в том виде, в котором вы это описываете. Это как обычное представление View, только данные хранятся на диске, а не формируются при каждом запросе. Разберем простой пример использования - логирование кодов ответов на HTTP запросы для нескольких приложений.

Создаем таблицу:

CREATE TABLE responses (
    time DateTime,
    app String,
    status UInt16
) engine=MergeTree()
ORDER BY (time, app)

Допустим что rps суммарно на все приложения 10к. Тогда получается что через месяц мы получим 432 000 000 записей, а через полгода уже 2 592 000 000. Для CH это не такие уж и большие объемы, и на хорошем железе они вряд ли создадут ощутимые проблемы, однако скорость ответа на некоторые аналитические запросы может упасть до уровня, который будет не подходить под бизнес-задачи. Также к статусу по желанию заказчика со временем может добавиться значение времени ответа или/и размера тела и т.д., что ощутимо увеличит таблицу. Наш пример больше для того чтобы понять особенности. (я, кстати, с проблемой времени ответа на аналитических запросах столкнулся когда уже перебиралось более 1 лярда записей и время ответа перестало отвечать бизнес-требованиям).

Давайте заполним вначале исходную таблицу какими-то данными. Предположим, что у нас три приложения: search, api, auth, на которые идет HTTP трафик.

INSERT INTO responses (time, app, status) VALUES
('2021-10-11 12:00:00', 'search', 200),
('2021-10-11 12:00:00', 'search', 200),
('2021-10-11 12:00:00', 'search', 200),
('2021-10-11 12:00:01', 'search', 200),
('2021-10-11 12:00:01', 'search', 200),
('2021-10-11 12:00:02', 'search', 500),
('2021-10-11 12:00:02', 'search', 200),

('2021-10-11 12:00:00', 'api', 200),
('2021-10-11 12:00:00', 'api', 500),
('2021-10-11 12:00:01', 'api', 200),
('2021-10-11 12:00:01', 'api', 200),
('2021-10-11 12:00:01', 'api', 403),
('2021-10-11 12:00:02', 'api', 200),

('2021-10-11 12:00:00', 'auth', 200),
('2021-10-11 12:00:01', 'auth', 500),
('2021-10-11 12:00:01', 'auth', 200),
('2021-10-11 12:00:02', 'auth', 200),
('2021-10-11 12:00:03', 'auth', 403),
('2021-10-11 12:00:03', 'auth', 200)

Наша задача - выводить данные суммированные поминутно по каждому приложению. Тут пока все просто:

SELECT 
    time, 
    app, 
    status,
    count(*) as count 
FROM responses 
GROUP BY time, app, status
ORDER BY time

Ответ:

┌────────────────time─┬─app────┬─status─┬─count─┐
│ 2021-10-11 12:00:00 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:00 │ api    │    200 │     1 │
│ 2021-10-11 12:00:00 │ search │    200 │     3 │
│ 2021-10-11 12:00:00 │ api    │    500 │     1 │
│ 2021-10-11 12:00:01 │ search │    200 │     2 │
│ 2021-10-11 12:00:01 │ api    │    403 │     1 │
│ 2021-10-11 12:00:01 │ auth   │    500 │     1 │
│ 2021-10-11 12:00:01 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:01 │ api    │    200 │     2 │
│ 2021-10-11 12:00:02 │ search │    200 │     1 │
│ 2021-10-11 12:00:02 │ api    │    200 │     1 │
│ 2021-10-11 12:00:02 │ search │    500 │     1 │
│ 2021-10-11 12:00:02 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:03 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:03 │ auth   │    403 │     1 │
└─────────────────────┴────────┴────────┴───────┘

Выборку смотрит заказчик, которому сильно не нравится ждать по несколько секунд или минут (как было в моем случае), а может и нас самих это не устраивает. Для решения этой проблемы могут хорошо подойти материализованные представления. В них можно хранить данные уже просуммированные, например, посекундно. Для нашего примера это подойдет лучше всего, а в реальности это могут быть и минуты или даже часы или дни.

Создаем агрегированную таблицу:

CREATE TABLE responses_by_sec (
    day     DateTime,
    app     String,
    status  UInt16,
    count   UInt64
) engine=MergeTree()
ORDER BY (day, app)

А теперь нужно связать агрегированную таблицу и таблицу-источник.

CREATE MATERIALIZED VIEW responses_by_sec_mat_view TO responses_by_sec AS
SELECT 
    time as day, 
    app, 
    status,
    count(*) as count 
FROM responses 
GROUP BY time, app, status

Если сейчас выполнить запрос

SHOW TABLES

То увидим:

┌─name──────────────────────┐
│ responses                 │
│ responses_by_sec          │
│ responses_by_sec_mat_view │
└───────────────────────────┘

МП выглядит как обычная таблица со всеми вытекающими. Например, можно посмотреть как она устроена:

SHOW CREATE TABLE responses_by_sec_mat_view
┌─statement───────────────────────────────────────────────────────────────────────────────────┐
│ CREATE MATERIALIZED VIEW t.responses_by_sec_mat_view TO t.responses_by_sec
(
    `day` DateTime,
    `app` String,
    `status` UInt16,
    `count` UInt64
) AS
SELECT
    time AS day,
    app,
    status,
    count(*) AS count
FROM t.responses
GROUP BY
    time,
    app,
    status │
└─────────────────────────────────────────────────────────────────────────────────────────────┘

Теперь очищаем таблицу источник (можно просто указать POPULATE при создании МП и оно сразу заполнится данными)

TRUNCATE responses

И заполняем заново тем же самым запросом, что был выше. Смотрим результат:

SELECT * FROM responses_by_sec ORDER BY day ASC

┌─────────────────day─┬─app────┬─status─┬─count─┐
│ 2021-10-11 12:00:00 │ api    │    200 │     1 │
│ 2021-10-11 12:00:00 │ api    │    500 │     1 │
│ 2021-10-11 12:00:00 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:00 │ search │    200 │     3 │
│ 2021-10-11 12:00:01 │ api    │    403 │     1 │
│ 2021-10-11 12:00:01 │ api    │    200 │     2 │
│ 2021-10-11 12:00:01 │ auth   │    500 │     1 │
│ 2021-10-11 12:00:01 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:01 │ search │    200 │     2 │
│ 2021-10-11 12:00:02 │ api    │    200 │     1 │
│ 2021-10-11 12:00:02 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:02 │ search │    200 │     1 │
│ 2021-10-11 12:00:02 │ search │    500 │     1 │
│ 2021-10-11 12:00:03 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:03 │ auth   │    403 │     1 │
└─────────────────────┴────────┴────────┴───────┘

Данные уже просуммированы по-секундно. Если теперь выборки для клиентов делать из этой новой таблицы, то это в сотни, а иногда даже тысячи раз будет сокращать количество записей, по которым мы смотрим выборку или что-то считаем. Оптимизация ощутимая.

Что важно учитывать

Первое. Если из таблицы источника данные удалить:

ALTER TABLE responses DELETE WHERE time='2021-10-11 12:00:00'

то на МП это НЕ повлияет, удаляйте ручками:

SELECT * FROM responses_by_sec ORDER BY day ASC

┌─────────────────day─┬─app────┬─status─┬─count─┐
│ 2021-10-11 12:00:00 │ api    │    200 │     1 │
│ 2021-10-11 12:00:00 │ api    │    500 │     1 │
│ 2021-10-11 12:00:00 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:00 │ search │    200 │     3 │
│ 2021-10-11 12:00:01 │ api    │    403 │     1 │
│ 2021-10-11 12:00:01 │ api    │    200 │     2 │
│ 2021-10-11 12:00:01 │ auth   │    500 │     1 │
│ 2021-10-11 12:00:01 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:01 │ search │    200 │     2 │
│ 2021-10-11 12:00:02 │ api    │    200 │     1 │
│ 2021-10-11 12:00:02 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:02 │ search │    200 │     1 │
│ 2021-10-11 12:00:02 │ search │    500 │     1 │
│ 2021-10-11 12:00:03 │ auth   │    200 │     1 │
│ 2021-10-11 12:00:03 │ auth   │    403 │     1 │
└─────────────────────┴────────┴────────┴───────┘

Второе. Агрегация идет ТОЛЬКО на данные, которые вы вставляете в текущей пачке. То есть если вставить данные за одно и тоже время, но разными запросами, то они не просуммируются между собой. Это же кстати касается любых других агрегирующих функций в CH. Вероятно за счет таких жертв обеспечивается очень высокая скорость вставки.

Варианты решения:

  • Накапливать пачку за одну секунду и кидать ее разом в CH. Это кстати наиболее предпочтительный способ с точки зрения производительности.

  • Заполнять агрегированную таблицу не с помощью МП а руками, скажем по крону раз в минуту брать данные за прошлую минуту и, нужным образом обработав, вставлять в агрегат:

INSERT INTO responses_by_sec (day, app, status, count) 
SELECT
    time AS day,
    app,
    status,
    count(*) AS count
FROM responses
GROUP BY
    time,
    app,
    status

ReplacingMergeTree.

Как описано в документации

выполняет удаление дублирующихся записей с одинаковым значением ключа сортировки (секция ORDER BY, не PRIMARY KEY).

Это может быть полезно когда возможны по разным причинам дубликаты записей. Допустим, что в прошлом примере данные собираются по крону раз в пять минут и тянутся из другой системы, которая эти метрики каким-то образом собирает. При таком подходе рано или поздно возникнет ситуация когда данные придут за один и тот же период дважды. Ну или в самом синке будет ошибка, которая приведет к некорректному поведению. Кейсы можно придумывать долго и разнообразно.

Чтобы на примере посмотреть как это работает можно немного усложнить предыдущую задачу. Предположим, что потребовалось хранить максимальное время ответа приложения по-секундно. При этом данные попадают в таблицу из какого-то другого источника. Важно то, что в таблице не должно быть записей с одинаковым временем для одного и того же приложения.

Сама таблица будет выглядеть следующим образом:

CREATE TABLE responses_time (
    time DateTime,
    app String,
    max_time UInt16
) engine=ReplacingMergeTree()
ORDER BY (time, app)

ReplacingMergeTree используется как раз для обеспечения той самой уникальности полей, заданных в ORDER BY (time, app). И работает это посредством перезатирания более старых дубликатов.

Итак, вставка данных:

INSERT INTO responses_time (time, app, max_time) VALUES
('2021-10-11 12:00:00', 'search', 500),
('2021-10-11 12:00:00', 'search', 600),
('2021-10-11 12:00:00', 'search', 700),
('2021-10-11 12:00:01', 'search', 250),
('2021-10-11 12:00:01', 'search', 251),
('2021-10-11 12:00:02', 'search', 310),
('2021-10-11 12:00:02', 'search', 320)

Результат:

SELECT * FROM responses_time

┌────────────────time─┬─app────┬─max_time─┐
│ 2021-10-11 12:00:00 │ search │      700 │
│ 2021-10-11 12:00:01 │ search │      251 │
│ 2021-10-11 12:00:02 │ search │      320 │
└─────────────────────┴────────┴──────────┘

CH как и ожидалось оставил только уникальные значения, которые были последними по списку. Но! Данные были вставлены в одном запросе и это имеет значение. Проверяем вставку, но уже отдельными запросами (с предварительной очисткой таблицы, для простоты эксперимента).

INSERT INTO responses_time (time, app, max_time) VALUES
('2021-10-11 12:00:00', 'search', 500),
('2021-10-11 12:00:01', 'search', 250),
('2021-10-11 12:00:02', 'search', 310)

А теперь второй запрос с дубликатами:

INSERT INTO responses_time (time, app, max_time) VALUES
('2021-10-11 12:00:00', 'search', 700),
('2021-10-11 12:00:01', 'search', 251),
('2021-10-11 12:00:02', 'search', 320)

Результат:

SELECT * FROM responses_time

┌────────────────time─┬─app────┬─max_time─┐
│ 2021-10-11 12:00:00 │ search │      500 │
│ 2021-10-11 12:00:01 │ search │      250 │
│ 2021-10-11 12:00:02 │ search │      310 │
└─────────────────────┴────────┴──────────┘
┌────────────────time─┬─app────┬─max_time─┐
│ 2021-10-11 12:00:00 │ search │      700 │
│ 2021-10-11 12:00:01 │ search │      251 │
│ 2021-10-11 12:00:02 │ search │      320 │
└─────────────────────┴────────┴──────────┘

CH вставил все, включая дубликаты, но в выдаче они разделены. Дело в том, что CH подчищает дубликаты не сразу, а спустя какое-то время. По всей видимости это асинхронный механизм и надеяться на него не стоит. В случаях когда данные из этой таблицы используются сразу, то это может сильно исказить результаты. Правда, есть возможность получить результат не дожидаясь мержа CH, для этого надо добавить FINAL в запросе:

SELECT * FROM responses_time FINAL ORDER BY time ASC

┌────────────────time─┬─app────┬─max_time─┐
│ 2021-10-11 12:00:00 │ search │      700 │
└─────────────────────┴────────┴──────────┘
┌────────────────time─┬─app────┬─max_time─┐
│ 2021-10-11 12:00:01 │ search │      251 │
│ 2021-10-11 12:00:02 │ search │      320 │
└─────────────────────┴────────┴──────────┘

Но делать так в проде категорически не рекомендуется, так как это повлечет за собой заметную деградацию производительности.

Еще один вариант решения как и в предыдущей статье: накапливать пачку за данных одну секунду и кидать ее разом в ClickHouse. Это кстати наиболее предпочтительный способ с точки зрения производительности, но разумеется далеко не всегда он возможен.

Во второй части я расскажу как подружить вместе материализованные представления и ReplacingMergeTree.

Комментарии (2)


  1. seriych
    27.03.2022 16:04
    +2

    Агрегация данных, вставляемых разными пачками решается через комбинатор State или SimpleState
    В вашем примере в матвью вместо count(*) можно сделать sumSimpleState(1), и селектить потом через sumMerge.

    "Данные были вставлены в одном запросе и это имеет значение."
    Это имеет значение, если выставлена настройка insert_deduplicate = 1. Для более эффективной вставки ее выставляют в 0, тогда и в одной вставке данные не будут схлопываться сразу.

    И у вас в примерах во всех таблицах в ключе сортировки стоит поменять порядок: ORDER BY (app, time). Для полного счастья что-то вроде такого:
    PARTITION BY toYYYYMM(date)
    ORDER BY (date, app, time)

    (при желании можно не делать отдельную колонку с датой, но так удобнее в запросах, а сожмется она в ноль)
    После этого может оказаться, что и без матвью скорости хватает.


    1. antgubarev Автор
      27.03.2022 16:05

      Спасибо обязательно попробую