В этой статье мы поговорим о том, как реализовать поведение атомарной вставки в ClickHouse. Рассмотрим несколько вариантов, подсветим их сильные и слабые стороны, а также, когда каждый из них применять.
Задача
Мы хотим добиться, чтобы не было случаев, когда мы начали вставку, а пользователь прочитал данные до её завершения и получил неактуальный (неполный) набор данных.
Неатомарная вставка = риск чтения некорректного набора данных.
Сценарии, когда такое может произойти:
Удалили партицию и хотели начать вставлять данные взамен удалённой, но пользователь обратился к этому интервалу.
Вставка данных оборвалась (из-за проблем с сетью, например).
Вставляем данные батчами, и пользователь проверил наличие данных, но не проверил, что все батчи на месте — прочитал часть данных.
Перейдём к реализации атомарной вставки в ClickHouse.
Вариант 1. Настройка min_insert_block_size_rows
Подход описан в статье от Altinity.
Суть его заключается в том, что мы можем управлять размером батча при вставке данных и принудительно вставить данные единым блоком.
Метод сработает, если при вставке мы создадим только 1 парт (какие критерии этого можно прочитать в статье)
Скрипт для реализации подхода
-- Создаем целевую таблицу
drop table if exists core.data;
create table if not exists core.data
(
num_int Int64,
partiton_num String
)
engine = MergeTree
order by num_int;
-- Вставляем данные
insert into core.data
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
intDiv(number, 1_000_000) as partiton_num
from
numbers(100_000_000);
-- Видим что партов больше чем 1
SELECT
count(),
min(rows),
max(rows),
sum(rows)
FROM system.parts
WHERE (level = 0) AND (table = 'data');
-- count()|min(rows)|max(rows)|sum(rows)|
-- -------+---------+---------+---------+
-- 96| 397345| 1048449|100000000|
-- Пересоздаем целевую таблицу
drop table if exists core.data;
create table if not exists core.data
(
num_int Int64,
partiton_num String
)
engine = MergeTree
order by num_int;
-- Вставляем данные но уже с настройкамии
-- input_format_parallel_parsing=0,
-- min_insert_block_size_bytes=0,
-- min_insert_block_size_rows=1000000000
-- так мы задаем что если строк меньше чем 1_000_000_000
-- то данные будут вставлены единым партом
insert into core.data
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
intDiv(number, 1_000_000) as partiton_num
from
numbers(100_000_000)
settings
input_format_parallel_parsing=0,
min_insert_block_size_bytes=0,
min_insert_block_size_rows=1_000_000_000;
-- Видим что создался только 1 парт
SELECT
count(),
min(rows),
max(rows),
sum(rows)
FROM system.parts
WHERE (level = 0) AND (table = 'data');
-- count()|min(rows)|max(rows)|sum(rows)|
-- -------+---------+---------+---------+
-- 1|100000000|100000000|100000000|Кажется что можно остановиться на этом варианте, но как писал выше у него есть минусы и зоны применения.
Плюсы:
Лаконичная и простая реализация
Минусы:
-
Высокое потребление памяти при вставке, ниже видем что даже на "игрушечном" примере разница значительная (в 10 раз). Вызвано это тем что весь парт предварительно пишется в оперативную память
event_time |memory_usage_gb|query | -------------------+---------------+------------------+ 2025-11-06 17:02:17| 3.5|with settings | 2025-11-06 17:04:13| 0.034|with out settings |
Если захотим вставлять данные батчами, то вариант не подойдет
Когда применять:
Небольшие таблицы
Единичная вствка в одну партицию
Если скорость разработки у вас в приоритете, и вы готовы принять, то что будет использоваться оперативная память на вставку
Вариант 2. Временная таблица + переименование
Вариант заключается в том, что мы вставляем новые данные во временную таблицу, после чего меняем местами названия таблиц (временная становится основной, а основная — временной).
Возможно это благодаря операции exchange в ClickHouse.
exchange tables table1 and table2
Скрипт для реализации подхода
-- Создаем целевую таблицу
drop table if exists core.data;
create table if not exists core.data
(
num_int Int64,
partiton_num String
)
engine = MergeTree
order by num_int;
-- Наполняем целевую таблицу данными
insert into core.data
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
intDiv(number, 1_000_000) as partiton_num
from
numbers(100_000_000);
-- Видим что в ней числа от 0 до 99_999_999
select
min(num_int),
max(num_int),
from core.data;
-- Создаем временную таблицу
drop table if exists stage.data_tmp;
create table if not exists stage.data_tmp
(
num_int Int64,
partiton_num String
)
engine = MergeTree
order by num_int;
-- Вставляем в нее числа в диапазоне 100_000_000 - 200_000_000
insert into stage.data_tmp
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
intDiv(number, 1_000_000) as partiton_num
from
numbers(100_000_000, 100_000_000);
-- Меняем местами временную и целевую таблицы
exchange tables core.data and stage.data_tmp;
-- Видим что в целевой обновились данные
-- и произошло это абсолютно атомарно
select
min(num_int),
max(num_int)
from core.data;Плюсы:
Достаточно простая и понятная реализация.
Можно откатиться, если снова переименовать таблицы (восстановить предыдущее состояние).
Нет затрат на дополнительную оперативную память.
Минусы:
Нужна дополнительная таблица, в которой необходимо поддерживать консистентность полей с основной.
Дополнительное место на диске (можно очищать tmp после переименования, и тогда этот минус почти незначительный).
Когда применять:
Таблицы ещё относительно небольшие, их можно полностью пересобрать.
Вариант 3. Временная таблица + replace партиции
Если мы не готовы полностью перезаписывать всю таблицу, мы можем доработать подход из варианта 2 и перезаписывать только нужную партицию.
Операция replace partition в ClickHouse работает атомарно, то есть партиция обновится сразу, а не отдельными блоками.
Скрипт для реализации подхода
-- Создаем целевую таблицу
-- Партиционируем таблицу (в 1й партиции миллион строк)
drop table if exists core.data;
create table if not exists core.data
(
num_int Int64,
partiton_num String
)
engine = MergeTree
partition by partiton_num
order by num_int;
-- Наполняем целевую таблицу данными
-- от 1 до 100М
insert into core.data
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
intDiv(number, 1_000_000) as partiton_num
from
numbers(100_000_000);
-- Видим что в партиции с id '99'
-- дежит диапазон 99М - 100М
select
min(num_int),
max(num_int)
from core.data
where partiton_num = '99';
-- Создаем временную таблицу
drop table if exists stage.data_tmp;
create table if not exists stage.data_tmp
(
num_int Int64,
partiton_num String
)
engine = MergeTree
partition by partiton_num
order by num_int;
-- Для временной таблицы заполняем только партицию с id '99'
-- Причем делаем диамазон 100М - 101М
insert into stage.data_tmp
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
'99' as partiton_num
from
numbers(100_000_000, 1_000_000);
-- Проверяем что вставился именно диапазон 100М - 101М
select
min(num_int),
max(num_int)
from stage.data_tmp
where partiton_num = '99';
-- Делаем атомарную операцию replace partition
-- Партиция '99' копируется из stage.data_tmp и заменяется в core.data
alter table core.data replace partition '99' from stage.data_tmp;
-- Видим что в целевой таблице данные изменены
-- В партиции '99' диапазон 100М - 101М
select
min(num_int),
max(num_int)
from core.data
where partiton_num = '99';Плюсы:
Перезапись короткого интервала, что чаще используется, чем обновление всей таблицы.
Нет затрат на дополнительную оперативную память.
Минусы:
Нужна дополнительная таблица, в которой необходимо поддерживать консистентность полей с основной.
Дополнительное место на диске (можно очищать tmp после переименования, что уменьшает минус).
Когда применять:
В случаях, когда используется стратегия обновления через перезапись партиций.
Вариант 4. View с фильтрами
Считаю, что поддерживать временную и целевую таблицы консистентными (одинаковый набор полей и их порядок) может быть затруднительно, особенно если вы синхронизируете их вручную.
Часто пользователи обращаются к таблицам не напрямую, а через view, где может быть реализована легковесная логика. В таком случае мы можем использовать это view как инструмент атомарного обновления данных.
Подход:
Имеем таблицу и view поверх неё.
Во view используем фильтр, который выбирает только актуальные данные. Например, фильтр может быть таким: partiton_num < 100.
Добавляем новую партицию данных insert'ом в целевую таблицу.
Пересоздаём view с обновлённым фильтром.
Скрипт для реализации подхода
-- Создаем целевую таблицу
drop table if exists core.data;
create table if not exists core.data
(
num_int Int64,
partiton_num Int16
)
engine = MergeTree
partition by partiton_num
order by num_int;
-- Наполняем целевую таблицу данными
-- от 1 до 100М
insert into core.data
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
intDiv(number, 1_000_000) as partiton_num
from
numbers(100_000_000);
-- Создаем view для целевой таблицы
create or replace view core.v_data as
select
num_int,
partiton_num
from core.data
where partiton_num < 100;
-- Видим что доступны данные в интервале
-- от 1 до 100М
select
min(num_int),
max(num_int)
from core.v_data;
-- Вставляем еще блок данных в диапазоне 100М - 101М
-- Для наглядности делаем это в 2 подхода
-- 1й батч
insert into core.data
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
'100' as partiton_num
from
numbers(100_000_000, 500_000);
-- 2й батч
insert into core.data
(
num_int,
partiton_num
)
select
toInt64(number) num_int,
'100' as partiton_num
from
numbers(100_500_000, 500_000);
-- Видим что пока нам все еще доступен интервал
-- от 1 до 100М
select
min(num_int),
max(num_int)
from core.v_data;
-- Пересоздаем view и тем самым предоставляя пользователю новые данные
create or replace view core.v_data as
select
num_int,
partiton_num
from core.data
where partiton_num < 101;
-- Видим что доступны данные до 101М
select
min(num_int),
max(num_int)
from core.v_data;Плюсы:
Нет дополнительных затрат на дисковое пространство.
Легко реализовать, если view уже используется.
Минусы:
Не самый гибкий подход. Если требуется перезапись существующих данных, придётся дважды обновлять view для одной вставки (до и после), что приведёт к большим объёмам DDL-операций на кластере.
Когда применять:
Если view уже используется для работы с данными.
Если фильтр выбора актуальных данных не слишком сложный (подходит для таблиц, где данные в основном только добавляются и редко перезаписываются).
Примечания и советы:
Используйте insert_quorum (https://clickhouse.com/docs/operations/settings/settings#insert\_quorum). Без этой настройки есть риск, что данные запишутся только на одну реплику, а затем чтение будет происходить с другой, на которую данные ещё не успели реплицироваться.
Спасибо что прочитали до конца
Приглашаю вступить в мой Telegram-канал - flow_data, там я буду публиковать материалы и заметки по Data Engineering.