Около двух лет назад вышла небольшая статья Kafka Streams — непростая жизнь в production, в которой я описывал сложности, с которыми наша команда столкнулась при попытке решить задачи проекта с помощью kafka-streams. Эксперимент вышел неудачным, и мы в итоге совсем отказались от этой технологии. Вместо нее решили попробовать Clickhouse (CH), и сейчас уже можно сказать, что эта база нам очень хорошо подошла и отлично решает почти все задачи, которые нам ставит бизнес. В этой статье я расскажу об особенностях использования CH.

При этом я не противопоставляю kafka-streams и clickhouse, и даже не сравниваю их, так как это совершенно разные технологии. Обе они в теории могли бы решать те задачи, которые перед нами стояли. В этом, возможно, единственное сходство между ними.

Внедрение на проекте CH было сложным и трудоемким процессом, который в итоге окончился успешно, хотя и не без проблем. В этой статье, как и в предыдущей, я опишу сложности, с которыми мы сталкивались на разных этапах. Также упомяну некоторые проверенные временем практики, которые мы использовали. Это только наш опыт, которого по большей части нет в документации CH. Надеюсь, он будет полезен читателям.

Когда я только начинал работать с CH и проектировал архитектуру системы на его основе, мне очень не хватало опыта со стороны, чтобы кто-то мог сказать “да, это будет работать” или “нет, так лучше не делать”. Задача этой статьи в том, чтобы дать ответы на некоторые подобные вопросы тем, кто, также как и я два года назад, в этом нуждается.

Что мы имеем сейчас?

На сегодняшний день у нас работает не самый большой, но уже и не маленький кластер CH, который расположен в двух датацентрах на разных континентах. В общей сложности это 14 серверов: 7 шард по 2 реплики в каждой.

Изначально мы внедряли CH версии 20.3.19.4, после чего он долгое время работал без обновлений, и только совсем недавно мы решились обновить его до 23.3.1.2823. Ниже я опишу, с чем мы столкнулись при обновлении.

У нас имеется одна основная “сырая” (или “широкая”) таблица, в которую записываются события со всей платформы, и вокруг неё ещё около 70 мелких таблиц, так называемых Materialized Views (MV), которые являются производными от основной и содержат по большей части агрегированные данные.

В сырой таблице около 150 столбцов. Мы делаем в нее около 3 миллионов INSERT’ов в секунду. За сутки это примерно 3 терабайта сжатых данных. Вставка происходит через встроенный в базу kafka движок. При этом в кластер приходит около 3 SELECT запросов в секунду (просто трёх, не миллионов). Сами сервера довольно мощные, и не слишком расслаблены. Однако в целом имеют достаточный запас прочности для увеличения нагрузки.

Перед внедрением CH у нас были большие сомнения, потянем ли мы в принципе такой большой объем входящих событий при условии, что мы храним в базе не только агрегированные значения (MV), но и сами сырые данные для анализа. Для нас количество серверов на платформе - это довольно критичный показатель. Как в итоге оказалось, CH действительно хорошо оптимизирован и потребляет на удивление мало ресурсов по отношению к такому объёму данных. В этом плане мы были приятно удивлены.

Далее пройдемся по основным решениям, которые мы используем в нашей системе.

Kafka engine

Как я уже писал, мы решили вставлять данные в базу именно через встроенный в CH движок. Причин было несколько:

  • Не хотелось самостоятельно решать вопрос с группировкой событий в пачки перед вставкой.

  • Если писать отдельный сервис для вставки (или брать уже готовый), то это дополнительные сервера, дополнительная нагрузка на сеть, а также расходы на разработку, поддержку и так далее. Проще говоря, использование kafka engine дешевле.

  • Алексей Миловидов советовал его использовать здесь: https://www.youtube.com/watch?v=Ac2C2G2g8Cg

  • Довольно большая работа по стабилизации движка проводилась этими ребятами: https://altinity.com/blog/clickhouse-kafka-engine-faq

В целом нареканий к движку у нас особо не было, кроме того момента, что в нашей версии 20.3.19.4 было необходимо создавать по одной таблице для каждого консумера. Как итог у нас было по 15 одинаковых таблиц на каждом сервере, у которых в названии были постфиксы _1, _2, … _15. Иначе масштабирование на большое количество партиций было невозможно.

В более новых версиях CH появилась возможность создать одну таблицу и указать у неё параметр kafka_num_consumers. Советую использовать именно этот вариант, т.к. каждая отдельная таблица сама по себе создает довольно ощутимую нагрузку на процессор.

По какой-то причине параметр kafka_num_consumers ограничен количеством ядер на сервере, так что если вам необходимо большее количество потоков, то придется создавать дополнительные таблицы.

Миграции

Одна из главных сложностей, с которой мы столкнулись при работе с этой базой - это добавление или удаление столбцов, особенно если это касается MV. Когда обновляли версию Clickhouse с 20.3.19.4 до 23.3.1.2823, мы делали это в несколько этапов. Каждый этап длился иногда не одну неделю, так что у меня есть опыт выполнения миграций на разных версиях в течение длительного времени. Вот неполный перечень различных проблем, которые у нас были без привязки к версиям:

  • Зависание команд alter на неопределенный срок, иногда больше суток, случайным образом.

  • Команда drop view случайным образом удаляет не только саму MV, но еще и таблицу, в которой хранятся данные.

  • Невозможность изменить запрос в MV без даунтайма: между командами drop view и create materialized view проходит интервал времени, в течение которого данные в таблицу не пишутся. В новых версиях попытались это исправить, но исправление работает не на все типы MV и, похоже, все еще является экспериментальной фичей.

  • Невозможность без ошибок выполнить команды alter совместно с on cluster на реплицированных таблицах.

  • Невозможность отсоединить MV от физической таблицы перед выполнением alter.

Здесь обращаю особое внимание на то, что это только те проблемы, с которыми в разное время сталкивались мы. Не могу утверждать, что они будут и у вас. Точно могу сказать одно: миграции в CH - это долгий и трудоемкий ручной процесс, который сложно (если вообще возможно) автоматизировать, и, что хуже всего, который осложняется проблемами и багами в самом CH, часть из которых я привёл выше.

Денормализация - это нормально

Скажу на первый взгляд страшную вещь: в нашей сырой таблице на самом деле находится 11 таблиц, если смотреть на нее с точки зрения реляционной базы данных. Однако для CH это вполне нормально.

Представьте себе ситуацию, что у вас есть два больших потока событий в вашей системе. Первых поток - огромный, а второй - просто большой. При этом первый и второй потоки связаны друг с другом связью один-ко-многим. В случае с реляционной базой вы создадите две таблицы и будете каждый поток событий писать в свою. А во время работы с этими данными будете делать операцию JOIN.

В случае с CH это большая ошибка. Вы не сможете делать JOIN на таблицах с сырыми данными по той причине, что данных слишком много. Они не уместятся в память.

Если вы уверены, что события из второго потока происходят только после того, как произойдет событие из первого (у нас это так), то правильным решением здесь будет сделать одну таблицу для первого потока, у которой есть структура Nested, сохраняющая в себе связанные события из второго потока. Nested - это по сути массив событий, в который вы можете добавлять новые по мере того, как они приходят. Здесь может возникнуть два вопроса:

Первый. Как мы можем добавлять новые события в массив, если CH не позволяет часто выполнять команду update? (Тем более что второй поток тоже большой) 

Ответом на этот вопрос является движок VersionedCollapsingMergeTree. Он прекрасно работает в этом кейсе, советую использовать. Сложность тут будет только в том, чтобы потокобезопасным образом инкрементировать поле version. Для этого вам придётся написать дополнительный код, используя для блокировок какой-нибудь внешний key-value storage, например Redis.

Также с этим движком немного сложнее выполнять запросы SELECT (придётся использовать поле sign), однако к этому быстро привыкаешь.

Наша сырая таблица работает на этом движке, и мы активно используем её для анализа данных практически ежедневно, делая в неё напрямую различные запросы.

Второй. Насколько удобно будет выполнять аналитические запросы, если события находятся в массиве?

Удобно. У CH есть большое количество функций для работы с массивами, особенно упрощает жизнь array join. Здесь каких-либо проблем у нас не возникало.

Приведу также другой пример денормализации, который у нас используется. Представьте, что у вас есть событие А, которое порождает несколько событий Б, каждое из которых порождает несколько событий В. При этом все три события происходят одновременно. В реляционной базе здесь очевидно будет три таблицы, связанные между собой связью один-ко-многим.

В CH вы можете складировать всё это в одну таблицу. Каждая строка в этой таблице будет событием В, а в ней будут также прописаны данные по событиям А и Б. Примерно так:

Строка 1: А1 Б1 В1
Строка 2: А1 Б1 В2
Строка 3: А1 Б1 В3
Строка 4: А1 Б2 В4
Строка 5: А1 Б2 В5
и так далее.

Вы скажете мне, что здесь будет много дублирования. Всё так, однако для CH это нормальная ситуация: он хорошо сжимает такие данные. При такой реализации схемы таблицы вы сможете делать любые аналитические запросы и создавать любые MV, избегая операций JOIN.

SummingMergeTree или AggregatingMergeTree?

Исходя из нашей практики, можно сказать, что подавляющее количество MV имеют движок SummingMergeTree, т.к. кроме суммирования в агрегатах редко когда используется что-то ещё. Здесь я конечно должен оговориться, что это может зависеть от предметной области. У нас ситуация такова, что из 70 таблиц мы только в одной используем AggregatingMergeTree.

Также у нас имеется негативный опыт использования функций uniqState / uniqMerge в AggregatingMergeTree: это настоящие убийцы производительности базы, которые генерируют огромное количество данных в каждой строчке. Постфактум это очевидно, если понимаешь, как эти функции работают. Однако это не отменяет того, что использовать их на больших объемах данных нельзя. И ты об этом не узнаешь, пока не попробуешь. Вполне можно представить, что эта пара функций не единственная такая, а движок AggregatingMergeTree имеет и другие  подобные неожиданности.

Общая мысль в том, что лучше по умолчанию всегда брать SummingMergeTree, а AggregatingMergeTree использовать в каких-то специфических случаях, внимательно наблюдая за тем, какую нагрузку он создает на серверах. Магия в нем, увы, отсутствует.

Запросы без индексов

Перед началом использования базы нас несколько тревожил тот факт, что столбцов в сырой таблице будет много, но при этом есть только первичный ключ, а индексы создавать нельзя. С какой же скоростью будут работать запросы?

Как в итоге оказалось, проблем с этим в большинстве случаев нет, если соблюдать следующие условия:

  • Всегда указывать в запросе временной интервал, за который вам необходимо что-либо посчитать. При этом столбец со временем события должен быть частью первичного ключа.

  • Использовать в запросах минимум столбцов. Чем меньше столбцов в запросе, тем меньше данных CH придется считывать с диска.

  • Запрашивать не слишком старые данные, например, за последние несколько часов; тогда данные будут извлекаться из быстрого системного кэша, вместо медленного диска.

Практически сто процентов наших запросов удовлетворяют этим условиям и работают с приемлемой для аналитиков скоростью. Изредка бывает нужно посчитать что-то по старым данным, например за предыдущие дни. В этом случае базе приходится читать их с диска, и тогда нужно набираться терпения, чтобы дождаться результатов.

Здесь мне следует подчеркнуть, что речь не о MV, а именно о запросах в сырую таблицу, которые выполняют аналитики в нашей команде. С MV в этом смысле проблем нет совсем никаких, т.к. данных там на порядки меньше. Если вам нужно предоставлять какую-либо агрегированную информацию конечным пользователям, то используйте именно MV.

Несколько дата центров

Наша система обрабатывает события в двух дата центрах: один расположен в Европе, второй - в США. При этом CH должен показывать совмещенные данные сразу с обеих мест. Перед нами встал выбор из двух вариантов:

  1. Переносить данные из одного континента в другой через механизм репликации.

  2. Поднимать в каждом дата центре свой набор шард, при этом репликацию между дата центрами не делать.

В первом случае мы будем передавать гораздо большие объёмы данных по интернету, но при этом SELECT запросы будут выполняться быстрее за счёт того, что все данные из кластера расположены в локальной сети. Во втором случае наоборот - между континентами данных передается относительно немного, но при этом на каждый SELECT мы вынуждены ждать ответа как с локального, так и с удаленного дата центров. Это заметно замедляет выполнение запросов.

Мы выбрали второй вариант, поскольку объем входящих данных в нашей системе несопоставимо выше, чем объем данных, который используется при SELECT запросах. Было опасение, что задержки в запросах будут слишком большие, однако это опасение не оправдалось. Во время тестирования мы наблюдали замедление около 200 миллисекунд, что для нас вполне приемлемо.

Мониторинг

CH имеет большое количество метрик, которые можно экспортировать в prometheus. По этим метрикам мы без особого труда настроили информативный dashboard, по которому в большинстве случаев легко понять, работает база в нормальном режиме или имеются какие-то проблемы. Вот список основных метрик, которые мы используем:

ClickHouseMetrics_ReadonlyReplica
ClickHouseMetrics_VersionInteger
ClickHouseMetrics_BackgroundSchedulePoolTask
ClickHouseMetrics_BackgroundMergesAndMutationsPoolTask
ClickHouseMetrics_BackgroundFetchesPoolTask
ClickHouseMetrics_ReplicatedFetch
ClickHouseMetrics_BackgroundMessageBrokerSchedulePoolTask
ClickHouseMetrics_HTTPConnection
ClickHouseProfileEvents_ReplicatedPartFailedFetches
ClickHouseProfileEvents_InsertedRows
ClickHouseProfileEvents_DelayedInserts
ClickHouseProfileEvents_InsertedBytes
ClickHouseProfileEvents_DelayedInsertsMilliseconds
ClickHouseProfileEvents_SelectQuery
ClickHouseProfileEvents_MergedRows
ClickHouseProfileEvents_RejectedInserts
ClickHouseProfileEvents_DistributedConnectionStaleReplica
ClickHouseAsyncMetrics_Uptime
ClickHouseAsyncMetrics_MaxPartCountForPartition
ClickHouseAsyncMetrics_ReplicasMaxAbsoluteDelay

Это конечно не полный список того, что есть. Однако если вы не знаете с чего начать, то можете взять эти метрики и по мере необходимости что-то добавлять или изменять.

Внешние справочники

Система внешних справочников оказалась для нас настолько удачной, что мы используем её так же часто, как и MV. Мало какие запросы в базу у нас обходятся без справочников.

Основное преимущество здесь в том, что вам не обязательно писать всю возможную информацию, которая у вас имеется, в сырую таблицу. Достаточно записать, например только id, и в дальнейшем запрашивать в запросах дополнительную информацию из справочников по нему. В плане скорости выполнения запросов такой подход по ощущениям бесплатен (мы используем в основном тип ключа flat и иногда ip_trie).

Проблемы при обновлениях

Как я уже писал, мы обновляли базу с версии 20.3.19.4 по 23.3.1.2823. Делали мы это не за один раз, а в несколько итераций, переходя постепенно с одной стабильной версии на другую из списка altinity: https://docs.altinity.com/altinitystablebuilds/.

Мотивацией обновить базу было в основном появление UDF и оконных функций, которые добавляли в наш арсенал дополнительные возможности по оптимизации, и во многих аспектах также делали нашу жизнь проще. В остальном для нужд бизнеса нам вполне хватало и старой версии базы.

Обновление прошло мягко говоря не без проблем. Опишу самые вопиющие случаи.

Первое с чем столкнулись, перейдя на одну из списка стабильных версий, это пропажа данных в таблицах. Схема у них тривиальная, выглядит примерно так:

create table foo
(
   date Date,
   field1 UInt16,
   field2 FixedString(2),
   field3 UInt16,
   aggregate1 Int64,
   aggregate2 Decimal128(15)
)
engine = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/foo', '{replica}')
partition by toYYYYMMDD(date)
order by (date, field1, field2, field3)

--Запись в таблицу происходит с помощью MV:
create materialized view foo_mv to foo
as select date,
         field1,
         field2,
         field3,
         sum(sign) as aggregate1,
         sum(money * sign) as aggregate2
from bar
group by toDate(eventTime) as date, field1, field2, field3;

Здесь bar - это наша сырая таблица, через которую проходят вставки агрегатов в остальные MV.

Нужно сказать, что мы в целом стараемся не использовать в CH какие-либо недокументированные, нерекомендуемые или экспериментальные фичи. И работаем строго по документации. Поэтому наши MV и таблицы в большинстве своём довольно просты. Я бы даже сказал скучны.

Мы были очень удивлены, когда сразу после обновления базы к нам начали приходить жалобы от пользователей на то, что у них в отчетах пропадают деньги. Причем данные пропадали в случайное время: иногда целый день всё было в порядке, а иногда данные терялись по несколькj раз в час. При этом они терялись не полностью: часть пропадала, а часть оставалась. Это происходило только за прошедшие дни, тоже случайным образом: за вчера, за несколько дней назад и т.д.

Пользователь мог в конце дня увидеть, например, что он заработал 1000$. А на следующий день утром он мог увидеть уже, например 700$ вместо 1000$. А за прошлую неделю не досчитаться ещё $2000 и так далее.

Мы пытались найти эту проблему в телеграм-канале CH и в github’е, но, к сожалению, безуспешно. Самостоятельный быстрый анализ показал только то, что потеря происходит во время мержа кусков данных. После слияния двух кусков с корректными данными мог получиться кусок без данных.

Проблемные таблицы до обновления работали без каких-либо нареканий в течение двух лет и сломались именно после обновления, так что мы были уверены, что проблема в самой базе, что это не наша ошибка.

Способ, которым мы это преодолели, был прост, но эффективен. Во-первых мы наладили механизм оперативного восстановления данных и выполняли его каждый раз, когда данные пропадали. После этого мы начали отключать одну за одной все настройки, появившиеся или изменившиеся в новой версии CH до тех пор, пока проблема не перестала воспроизводиться. Таким образом где-то в течение недели удалось найти виновника происшествия: это настройка optimize_on_insert, которая в старой версии была по умолчанию выключена, а в новой включилась. После того, как мы отключили эту настройку, данные из базы перестали пропадать.

Также во время обновлений сталкивались с поломками обратной совместимости. Например у нас есть несколько справочников с ключом ip_trie. В старой версии, чтобы получить данные по такому ключу, нужно было на вход функции dictGet передавать IP адреса типа FixedString(16). Но у нас IP адреса имеют тип IPv6. Соответственно чтобы это работало, нам приходилось выполнять такое преобразование: 

dictGet('my_dictionary', 'my_field', tuple(cast(ip_field as FixedString(16))). 

После обновления это перестало работать и мы начали получать ошибку

 <Error> void DB::StorageKafka::threadFunc(size_t): Code: 48. DB::Exception: CAST AS
FixedString is only implemented for types String and FixedString: While processing
dictGet … (NOT_IMPLEMENTED).

Мы тестировали все обновления в тестовом окружении, поэтому подобные ошибки нам не доставляли особых неудобств. Мы просто редактировали наши запросы так, чтобы они работали в обновленной версии. Однако это, к сожалению, получалось не всегда. Для записи данных из kafka в CH мы используем протокол capnp, и после очередного обновления десериализация сломалась сразу в нескольких местах так, что в некоторых случаях не существовало даже обходных путей. Я заводил на это пару тикетов: https://github.com/ClickHouse/ClickHouse/issues/43319 и https://github.com/ClickHouse/ClickHouse/issues/46522 . К счастью удалось связаться с автором изменений, и он оперативно устранил все проблемы.

Напоследок, для полной картины, опишу самую последнюю проблему, с которой столкнулись буквально на днях. Мы впервые начали использовать UDF в запросах из наших сервисов. И сразу столкнулись с тем, что в CH в таблице system.processes начали “зависать” незавершенные запросы. Причём зависают они случайным образом, в случайные моменты времени и совсем необязательно в них должны присутствовать UDF: зависают даже старые, совсем легковесные запросы, которые давно не изменялись. В течение нескольких дней эти зависшие запросы копятся, пока не достигают максимального значения, и CH перестаёт принимать на вход новые запросы.

Пробовали чинить эту проблему как с помощью конфигов CH, так и на стороне наших сервисов, но пока что безрезультатно. Приходится раз в несколько дней перезагружать все сервера, чтобы таблица с зависшими запросами очистилась. И да, команда kill query на них не работает.

Не так давно вышла очередная стабильная версия базы: https://docs.altinity.com/releasenotes/altinity-stable-release-notes/23.3/altinity-stable-23.3.8/. Она новее нашей на несколько минорных версий. На сегодняшний день основная надежда на то, что проблема уйдет, если обновимся.

Заключение

Если бы я вернулся на несколько лет назад и снова выбирал технологию для нашего проекта, то мой выбор бы не изменился: я бы снова взял CH, и снова реализовал бы все те же архитектурные решения. В этом смысле база идеально легла на наш проект. Мне неизвестны более удачные варианты. На мой взгляд сегодня CH в нашем проекте является безальтернативным решением.

При этом я всё же призываю читателей дважды подумать, прежде чем решиться начать использовать CH. Это очень специфичный инструмент под очень специфичные задачи. Вы определенно будете иметь с ним большой букет проблем, которые, впрочем, обычно решаемы.

Если вы не уверены, нужен ли вам CH на проекте, то вероятнее всего он вам не нужен, и следует взять на вооружение классическую реляционную базу.

Автор статьи: Андрей Буров, Максилект

P.S. Мы публикуем наши статьи на нескольких площадках Рунета. Подписывайтесь на нашу страницу в VK или на Telegram-канал, чтобы узнавать обо всех публикациях и других новостях компании Maxilect.

Комментарии (0)