Мы все чаще используем ClickHouse, и эта СУБД хорошо себя зарекомендовала для построения хранилищ данных и выполнения аналитических проектов. Благодаря column-oriented архитектуре ClickHouse сочетает в себе высокую производительность при вставке больших данных (миллионы строк в секунду) и невероятно быстрый full-scan по большим таблицам. Однако, за такую скорость работы этой СУБД приходится платить определёнными ограничениями, а пользователю – иметь ввиду нюансы (некоторые из них мы описали в предыдущей статье), которые непосредственно влияют на проектирование и разработку решений на базе Clickhouse.

В данной статье покажем – с какими трудностями мы столкнулись из-за архитектурных ограничений ClickHouse и как их преодолели, сократив потребляемые ClickHouse-сервером ресурсы почти в 50 раз.

Какая задача перед нами стояла

В ClickHouse в режиме онлайн поступает поток транзакций, по сути представляющий из себя исторические данные о том, сколько каких ресурсов (продуктов/товаров) с какой стоимостью было перемещено из торговых точек на склад и обратно. Над данными, постоянно поступающими в БД, выполняется ряд трансформаций, такие как кастомная дедубликация транзакций, обогащение данных по справочникам и агрегация.

В результате трансформаций мы собираем витрину с оборотами следующей структуры:

DROP TABLE IF EXISTS dm_turnover;
CREATE TABLE dm_turnover
(
    `date` Date,
    `pos` String,
    `storage` String,
    `productCode` UInt32,
    `amount` Decimal(38, 3),
    `cost` Decimal(38, 6),
    `currency` String,
    `supplyNumber` UInt64,
    `updateTime` DateTime('Europe/Moscow')
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(date)
ORDER BY (date, pos, storage, productCode);

Ключевая часть задачи заключается в том, чтобы строить ежедневную витрину с остатками – фактически это ежедневная накопительная информация из витрины с оборотами. DDL витрины с остатками:

DROP TABLE IF EXISTS dm_balance;
CREATE TABLE dm_balance
(
    `date` Date,
    `pos` String,
    `storage` String,
    `productCode` UInt32,
    `amount` Decimal(38, 3),
    `cost` Decimal(38, 6),
    `currency` String,
    `updateTime` DateTime('Europe/Moscow')
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(date)
ORDER BY (date, pos, storage, productCode);

Как видно из DDL витрины остатков, её структура практически идентична витрине оборотов. Однако методика наполнения витрины остатков сильно отличается, что вызывает определённые трудности, о которых мы расскажем дальше.

Объемы данных

Для начала – немного про объёмы данных. Витрина оборотов за 2022 год ежедневно собирала от 3.6 млн до 7.8 млн строк и занимала около 29 GB места на диске. Ежедневые данные из витрины оборотов складываются друг с другом и агрегируются по ключу (pos, storage, productCode) в витрину остатков.

Логика построения витрины остатков следующая (на примере нескольких дат):

( '2022-12-31' in dm_balance ) = ( '2022-12-31' in dm_turnover ) UNION ( '2022-12-30' in dm_balance )
( '2022-12-30' in dm_balance ) = ( '2022-12-30' in dm_turnover ) UNION ( '2022-12-29' in dm_balance )
( '2022-12-29' in dm_balance ) = ( '2022-12-29' in dm_turnover ) UNION ( '2022-12-28' in dm_balance )
...

Как мы видим, витрина остатков собирается из витрины оборотов и имеет накопительный эффект: то, что было накоплено в остатках к вчерашней дате, объединяется с оборотами на текущую дату. И так каждый день.

Из такой логики следует, что количество строк в витрине остатков может только расти вместе с ежедневным появлением новых сочетаний (pos, storage, productCode) в витрине оборотов. Витрина остатков в 2022 году ежедневно собирала от 65 млн до 143 млн строк ежедневно, а её вес за год составил более 650 GB. Количество строк и ежесуточные объемы данных по-прежнему увеличиваются с каждым днём.

Трудности построения витрины остатков

Нам необходимо поддерживать витрину остатков в актуальном состоянии при условии, что витрина оборотов может дополняться/изменяться в предыдущих датах. Например, если пришла пачка корректирующих транзакций в дату T-60 (~ два месяца ранее), тогда в дате T-60 изменилась витрина оборотов, и нам нужно перестроить витрину остатков последовательно в каждой дате, начиная с T-60 по текущий день. Формально перестроение витрины остатков в одной дате выполняется следующей последовательностью запросов:

ALTER TABLE tmp_balance DROP PARTITION :partKey; -- (*)

INSERT INTO tmp_balance
WITH :partKey AS partDate
SELECT 
    toDate(toString(partDate)) date, 
    pos, 
    storage, 
    productCode, 
    SUM(amount) amount, 
    SUM(cost) cost,
    currency,
    MAX(updateTime) updateTime
FROM (
    SELECT pos, storage, productCode, amount, cost, currency, updateTime 
    FROM {CH_SOURCE_TABLE} 
    WHERE toYYYYMMDD(date) = partDate
    UNION ALL
    SELECT pos, storage, productCode, amount, cost, currency, updateTime
    FROM {CH_TABLE}
    WHERE date = toDate(toString(partDate)) - INTERVAL 1 DAY
) GROUP BY pos, storage, productCode, currency;

ALTER TABLE dm_balance DROP PARTITION :partKey;

ALTER TABLE tmp_balance MOVE PARTITION :partKey TO TABLE dm_balance;

* Небольшая ремарка – почему мы используем промежуточную таблицу tmp_balance и манипулируем целыми партициями. Это еще одно следствие особенностей архитектуры ClickHouse, в частности –невозможности быстро и часто выполнять классические DML-операции DELETE/UPDATE. Данные операции выполняются в ClickHouse как DDL (см. https://clickhouse.com/docs/en/guides/developer/mutations/).Следовательно перестроение витрин целыми партициями гораздно быстрее и продуктивнее:  вместо тяжеловесных и долгих UPDATE/DELETE мы используем быстрые  INSERT и DROP/MOVE PARTITION. По этой же причине мы не стали использовать инструмент dbt, так как  в нём можно выполнять только DML операции, и он оказался не пригоден для нашей задачи из-за скорости работы и нерационального использования ресурсов сервера ClickHouse. В других задачах (на меньших объемах/только с инкрементальным обновлением данных/со слабыми требованиями по скорости) несомненно dbt будет удобен, но не в нашей текущей задаче.

Для наглядности мы изобразили то, как осуществляется актуализация витрины остатков при изменении витрины оборотов в предыдущей дате:

Итак, основная трудность, с которой мы столкнулись – нехватка оперативной памяти на выполнение запроса. Остро эта проблема встаёт, когда приходится делать запрос с подзапросом. Результирующая выборка подзапроса должна помещаться в оперативную память сервера ClickHouse и не превышать max_memory_usage/max_memory_usage_for_user, иначе запрос прервётся с ошибкой DB::Exception: Memory limit (for query) exceeded.

По нашей логике построения витрины остатков без подзапроса никак, так как нам нужно объединять и группировать данные из двух витрин.

При объединении 143 млн строк из витрины остатков с 7.8 млн строк из витрины оборотов для выполнения запроса "INSERT INTO tmp_balance" (запрос приведён выше) требуется до 55 GB оперативной памяти. При этом нужно учитывать, что это не единственный запрос, выполняющийся на хосте ClickHouse. В базе данных есть и другие параллельные активности, связанные с трансформацией и получением данных (актуализация той же витрины оборотов, например).

Получается, что для содержания только одной витрины остатков в актуальном состоянии нам необходим хост ClickHouse с ~ 64 GB оперативной памяти. Да, можно постоянно увеличивать ресурсы (capacity) сервера Clickhouse вместе с ростом объёмов и система будет работать исправно, но назвать это рациональным решением проблемы – нельзя, это будет скорее временным work around.

Но мы нашли решение, позволяющее решить задачу по затратам на аппаратные ресурсы без значительной потери производительности и скорости обработки данных.

Как мы это решили

Решением данной проблемы оказалось изменение партиционирования витрины с остатками на более мелкие части и доработка приложения для трансформации. Если раньше одна партиция содержала данные за целые сутки (за одну дату), то теперь данные за одни сутки распределены по 50 разным партициям, которые различаются друг от друга по остатку (mod) кода продукта. Итоговый DDL витрины с остатками следующий:

DROP TABLE IF EXISTS dm_balance;
CREATE TABLE dm_balance
(
    `date` Date,
    `pos` String,
    `storage` String,
    `productCode` UInt32,
    `amount` Decimal(38, 3),
    `cost` Decimal(38, 6),
    `currency` String,
    `updateTime` DateTime('Europe/Moscow')
)
ENGINE = MergeTree
PARTITION BY toUInt64(concat(toString(toYYYYMMDD(date)), toString(productCode % 50)))
ORDER BY (date, productCode, pos, storage);

Нужно отметить, что вместе с «дроблением» витрины остатков на более мелкие партиции, усложнилась и логика приложения для трансформации. Если раньше мы просто и последовательно перестраивали подряд суточные партиции с даты T-N до Т, то теперь актуализация остатков в определённой дате должна происходить только для определённых «мелких» партиций – тех, в которых были изменения в витрине оборотов.

Для лучшего понимания мы изобразили, как теперь актуализируется витрина остатков:

Теперь процесс перестроения витрины остатков чем-то напоминает капли дождя на окне: отдельные капли спускаются с самого верха по стеклу, прихватывая по пути другие капли:)

Далее покажем сам алгоритм трансформаций в виде кода. 

Обычно мы запускаем трансформации на бессерверных технологиях (AWS Lambda): это выходит дешевле и удобнее по сравнению с содержанием виртуальной машины в облаке или On-Premise, так как тарифицируется только количество запусков и потраченные GB*hour. В данной статье мы представим универсальный код без деталей инфраструктуры, в которой он исполняется, чтобы вы смогли адаптировать его под свои нужды и инфраструктуру.

Приложение для трансформации реализует следующие методы:

1) Запуск по расписанию. Приложение сначала проверяет, что работает в Singleton режиме (дальше подробнее опишем – как это сделали), и если другие экземпляры приложения в текущий момент не работают, то определяет партиции в витрине оборотов, в которых произошли изменения (есть расхождения по update_time с витриной остатков). Если произошли изменения в двух партициях с одинаковым модулем %50, то остается самая ранняя по дате. На выходе этого метода – даты и номера партиций, которые необходимо перестроить. Дальше по выходным данным вызывается другой метод приложения. Выбор метода передачи данных между функциями приложения оставляем на усмотрение разработчика – это может быть прямой вызов из некоторого метода-контроллёра, либо передача через очередь, БД и т.п.

2) Запуск для трансформации одной даты. Рекурсивный метод, который обрабатывает партиции в самой ранней дате в пачке из запроса, преобразует запрос –передвигает перестроенные только что партиции в следующую дату, и передаёт вызов сам себе для преобразования в следующей по очереди дате.

Для начала опишем вспомогательные метод и таблицу, которые помогают работать приложению в Singleton-режиме, независимо от количества обрабатываемых в текущий момент партиций и глубины обработки. В ClickHouse создадим следующую таблицу:

DROP TABLE IF EXISTS dm_balance_log;
CREATE TABLE dm_balance_log
(
    `parent` UUID,
    `child` UUID,
    `op` UInt8,
    `insertTime` DateTime('Europe/Moscow') DEFAULT now()
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(insertTime)
ORDER BY insertTime
TTL toStartOfDay(insertTime) + INTERVAL 3 DAY;

Таблица послужит для хранения журнала исполнения и в то же время поможет работать приложению в Singleton-режиме при запуске по расписанию.

Для записи в таблицу dm_balance_log мы реализуем следующий вспомогательный метод:

def trail(parentId, childId, op):
    query = '''INSERT INTO {CH_TABLE}_log (* EXCEPT insertTime) VALUES ('{parentId}', '{childId}', {op})'''.format(
        parentId=parentId, childId=childId, op=op, CH_TABLE=CH_TABLE
    )
    clickhouse.execute(query)

Далее реализуем метод, запускаемый периодически по расписанию, который осуществляет поиск изменённых партиций в исходной витрине оборотов:

import os
from clickhouse_driver import Client
from datetime import datetime, timedelta
import pytz
 
CH_HOST = os.getenv('CH_HOST')
CH_USER = os.getenv('CH_USER')
CH_PASSWORD = os.getenv('CH_PASSWORD')
CH_DB = os.getenv('CH_DB')

CH_SOURCE_TABLE='dm_turnover'
CH_TABLE='dm_balance'
CH_TMP_TABLE='tmp_balance'
CH_MODULO_BASE=50

clickhouse = Client(host=CH_HOST,
                    user=CH_USER,
                    password=CH_PASSWORD,
                    database=CH_DB,
                    secure=True)
 
def changed_partitions(requestId):
    query = '''
            WITH {CH_MODULO_BASE} AS MODULO_BASE,
            (SELECT
                parent=child AND op = 1
                OR insertTime < now() - INTERVAL 15 MINUTE
            FROM {CH_TABLE}_log ORDER BY insertTime DESC LIMIT 1) AS isUnlocked,           
            rawParts AS (
                SELECT concat(toString(toYYYYMMDD(date)), toString(modulo(productCode,MODULO_BASE))) partKey, max(updateTime) maxUpdateTime
                FROM {CH_SOURCE_TABLE}
                WHERE (isUnlocked = 1 OR isNull(isUnlocked))
                AND date >= toDate('20220101')
                AND updateTime > toStartOfMinute(now()) - INTERVAL {INTERVAL}
                group by partKey),
            parts AS (
                SELECT _partition_id as partKey, max(updateTime) maxUpdateTime
                FROM {CH_TABLE}
                WHERE _partition_id IN (SELECT partKey FROM rawParts)
                GROUP BY partKey),
            changedParts AS (
            SELECT substring(partKey, 1, 8) dt, toUInt64(substring(partKey, 9)) modul
            FROM rawParts rp LEFT JOIN parts p USING partKey
            WHERE rp.maxUpdateTime > p.maxUpdateTime
            ORDER BY modul, dt
            LIMIT 1 BY modul)
            SELECT dt, groupArray(modul) modules FROM changedParts
            GROUP BY dt
            ORDER BY dt
        '''.format(
        INTERVAL='1 HOUR',
        CH_SOURCE_TABLE=CH_SOURCE_TABLE,
        CH_TABLE=CH_TABLE,
        CH_MODULO_BASE=CH_MODULO_BASE
    )
 
    msg = {
        'process': {},
        'next': [],
        'parentId': requestId
    }
 
    cnt = 0
    settings = {'max_block_size': 100000}
    rows_gen = clickhouse.execute_iter(query, settings=settings)
    for row in rows_gen:
        dt = row[0]
        modules = row[1]
        if cnt == 0:
            msg['process'] = {'dt': dt, 'modules': modules}
        else:
            msg['next'].append({'dt': dt, 'modules': modules})
        cnt += 1
 
    if cnt > 0:
        # invoke "transform" method with msg
        ...
    else:
        # nothing to process / skip
        ...

Далее реализуем метод, который осуществляет непосредственно трансформацию в самой ранней дате из запроса и при необходимости рекурсивно вызывает сам себя:

# ... the same import and global vars
# ... the same clickhouse client initialization
# ... the implementation of method "changed_partitions" 

def transform(msg, requestId):
    partDate = msg['process']['dt']
    partModules = msg['process']['modules']
    parentId = msg['parentId']
    trail(parentId=parentId, childId=requestId, op=0)

    for p in partModules:
        partModule = str(p)
        partKey = partDate + partModule

        clickhouse.execute('ALTER TABLE {CH_TMP_TABLE} DROP PARTITION {partKey};'.format(
            CH_TMP_TABLE='dm_balance', partKey=partKey))

        query = '''
                INSERT INTO {CH_TMP_TABLE}
                WITH {CH_MODULO_BASE} AS MODULO_BASE,
                toString({partDate}) AS partDate,
                {partModule} AS partModule
                SELECT 
                    toDate(partDate) date, 
                    pos, 
                    storage, 
                    productCode, 
                    SUM(amount) amount, 
                    SUM(cost) cost, 
                    currency,
                    MAX(updateTime) updateTime
                FROM (
                    SELECT pos, storage, productCode, amount, cost, currency, updateTime 
                    FROM {CH_SOURCE_TABLE} 
                    WHERE _partition_id = partDate
                    AND modulo(productCode, MODULO_BASE) = partModule
                    UNION ALL
                    SELECT pos, storage, productCode, amount, cost, currency, updateTime
                    FROM {CH_TABLE}
                    WHERE _partition_id = concat(toString(toYYYYMMDD(toDate(partDate) - INTERVAL 1 DAY)), toString(partModule))
                ) GROUP BY pos, storage, productCode, currency;
            '''.format(
            CH_TMP_TABLE=CH_TMP_TABLE,
            CH_SOURCE_TABLE=CH_SOURCE_TABLE,
            CH_TABLE=CH_TABLE,
            partDate=partDate,
            partModule=partModule,
            CH_MODULO_BASE=CH_MODULO_BASE
        )
        clickhouse.execute(query)

        query = 'ALTER TABLE {CH_TABLE} DROP PARTITION {partKey};'.format(
            CH_TABLE=CH_TABLE, partKey=partKey
        )
        clickhouse.execute(query)

        query = 'ALTER TABLE {CH_TMP_TABLE} MOVE PARTITION {partKey} TO TABLE {CH_TABLE};'.format(
            CH_TABLE=CH_TABLE, CH_TMP_TABLE=CH_TMP_TABLE, partKey=partKey
        )
        clickhouse.execute(query)

    trail(parentId=parentId, childId=requestId, op=1)

    nextDate = datetime.strptime(partDate, '%Y%m%d').replace(
        tzinfo=pytz.timezone('Europe/Moscow')).date() + timedelta(days=1)
    today = datetime.now().replace(
        tzinfo=pytz.timezone('Europe/Moscow')).date()

    if nextDate <= today:
        msg['process']['dt'] = nextDate.strftime('%Y%m%d')
        if 'next' in msg and len(msg['next']) > 0 and 'dt' in msg['next'][0] and msg['next'][0]['dt'] == msg['process']['dt']:
            msg['process']['modules'] = msg['process']['modules'] + \
                msg['next'][0]['modules']
            msg['process']['modules'] = sorted(set(msg['process']['modules']))
            del msg['next'][0]
        # ... invoke transform recursively with msg
    else:
        trail(parentId=parentId, childId=parentId, op=1)

Хотим заметить, что приведенный код не является эталонным образцом для Python-разработчиков, и в нём наверняка есть, что улучшить. Мы всего лишь хотели формализовать и продемонстировать алгоритм, по которому работает приложение для решения данной задачи.

В целом данный подход применим для построения витрин остатков, когда нужно собирать (и постоянно обновлять) в одном месте результат запроса с оконной функцией (window function). Однако, по целой таблице данный запрос не выполняется из-за нехватки оперативной памяти ввиду больших объёмов в исходных таблицах.

Регулируя степень партиционирования (в нашем случае – количество партиций в одной дате), мы можем контролировать потребление памяти на сервере Clickhouse от запросов приложения для трансформаций.

Заключение

В результате, имея делитель, по которому дробятся даты по коду продукта на отдельные партиции, равный 50, а также выполняя каждую атомарную операцию последовательно, мы сократили в 50 раз потребление оперативной памяти в ClickHouse: с 55 GB до 1.1 GB!

Из минусов можно выделить:

  1. Усложнение логики трансформации и как следствие – удорожание сопровождения и развития;

  2. Перестроение витрины остатков замедлилось примерно на 10-20%;

  3. Усложнились некоторые запросы SELECT к витрине остатков, так как значительно увеличилось количество обрабатываемых при запросе партиций, которые теперь не так явно связаны с временем (поле «date»). Добавляя в такие запросы предикаты, содержащие условия по названию партиций, мы оптимизируем их исполнение и получаем прежнюю скорость вычислений.

На приведённые «минусы» либо есть альтернативное решение, либо они полностью компенсируются огромным «плюсом» – многократным сокращением требуемых ресурсов (оперативной памяти) сервера ClickHouse и работоспособностью решения в целом.

На наш взгляд, данный кейс хорошо показывает, насколько важно детально продумывать и проектировать решения с ClickHouse. Из-за особенностей и ограничений реализованное решение/система/приложение может просто не заработать, если вовремя не учесть обрабатываемые объемы и, соответственно, не адаптировать подход к обработке (трансформации) данных.

Ну, и напоследок хотели бы дать небольшой совет, если вам предстоит реализовывать решения на базе ClickHouse: партиционировать большие таблицы разумно, чтобы в дальнейшем можно было сократить ресурсы на обработку данных и, возможно, упростить разработку/сократить время разработки. Не всегда интуитивное партиционирование (например, только по дате) позволит в дальнейшем нормально работать с данными – иногда это приводит к неработоспособности решения (как в нашем примере). В то же время, слишком детализированное партиционирование (много маленьких партиций) может значительно увеличить время трансформации и усложнить логику приложений, реализующих её. Поэтому важно как можно раньше выяснить обрабатываемые объемы данных, требования к скорости их подготовки, а также найти золотую середину между сложностью партиционирования таблиц и логики трансформаций.

Желаем удачи!

Комментарии (2)


  1. JohnSelfiedarum
    21.04.2023 13:35

    Грамотно построенный запрос - хорошо. Грамотно построенная архитектура - залог успеха!


  1. BackDoorMan
    21.04.2023 13:35
    +1

    Ну можно, конечно, скриптами обсчитывать. А можно было бы использовать Materialized View и писать в SummingMergeTree