Развивая любой продукт, будь то видеосервис или лента, истории или статьи, хочется уметь измерять условное «счастье» пользователя. Понимать, делаем мы своими изменениями лучше или хуже, корректировать направление развития продукта, опираясь не на интуицию и собственные ощущения, а на метрики и цифры, в которые можно верить.
В этой статье я расскажу, как нам удалось запустить продуктовую статистику и аналитику на сервисе с 97-миллионной месячной аудиторией, получив при этом чрезвычайно высокую производительность аналитических запросов. Речь пойдёт о ClickHouse, используемых движках и особенностях запросов. Я опишу подход к агрегации данных, который позволяет нам за доли секунды получать сложные метрики, и расскажу о преобразовании и тестировании данных.
Сейчас у нас около 6 миллиардов продуктовых событий в сутки, в ближайшее время дойдём до 20–25 миллиардов. А дальше — не такими быстрыми темпами поднимемся до 40–50 миллиардов к концу года, когда опишем все интересующие нас продуктовые события.
1 rows in set. Elapsed: 0.287 sec. Processed 59.85 billion rows, 59.85 GB (208.16 billion rows/s., 208.16 GB/s.)
Подробности под катом.
Предисловие
Аналитические инструменты были ВКонтакте и раньше. Уникальные пользователи считались, можно было строить графики событий по срезам и тем самым проваливаться вглубь сервиса. Однако речь шла о фиксированных заранее срезах, об агрегированных данных, об HLL для уников, о некоторой скованности и невозможности отвечать быстро на вопросы чуть более сложные, чем «сколько?».
Конечно, был, есть и будет hadoop, в него тоже писалось, пишется и будет писаться много, очень много логов использований сервисов. К сожалению, hdfs применялся лишь некоторыми командами для реализации собственных задач. Ещё более печально, что hdfs — не про быстрые аналитические запросы: ко многим полям были вопросы, ответы на которые приходилось искать в коде, а не в доступной всем документации.
Мы пришли к выводу, что так жить больше нельзя. Данными должна обладать каждая команда, запросы над ними должны быть быстрыми, а сами данные — точными и богатыми на полезные параметры.
Поэтому мы сформулировали понятные требования к новой системе статистики/аналитики:
- аналитические запросы должны быть быстрыми;
- данные — достаточно точными, в идеале это сырые события пользовательского взаимодействия с сервисом;
- структура событий должна быть описана, понятна и доступна;
- обеспечивается надёжность хранения данных, гарантия однократной доставки;
- есть возможность считать уников, аудиторию (дневную, недельную, месячную), метрики удержания, проведённое пользователем в сервисе время, квантили действий на уника и другие метрики по множеству срезов;
- выполняется тестирование, преобразование данных и визуализация.
На кухне
Опыт подсказывал, что мы нуждаемся в двух базах: медленной, где мы бы агрегировали и обогащали данные, и быстрой, где с этими данными можно было бы работать и поверх которой строить графики. Это один из самых распространённых подходов, при котором в медленной базе, например, в hdfs, строятся разные проекции — на уников и на количество событий по срезам на определённый промежуток в времени.
Тёплым сентябрьским днём, при разговоре за чашкой чая на кухне с видом на Казанский собор, у нас возникла идея попробовать ClickHouse в качестве быстрой базы — на тот момент мы уже использовали его для хранения технических логов. Было много сомнений, связанных в первую очередь со скоростью и надёжностью: заявленные тесты производительности казались нереальными, а новые релизы базы периодически ломали имеющуюся функциональность. Потому предложение было простым — пробовать.
Первые пробы
Мы развернули кластер из двух машин с такой конфигурацией:
2xE5-2620 v4 (32 ядра в сумме), 256G ram, 28T места (raid10 с ext4).
Изначально был near layout, но затем мы перешли на far. В ClickHouse есть множество различных движков таблиц, но основные — из семейства MergeTree. Выбрали ReplicatedReplacingMergeTree с примерно такими настройками:
PARTITION BY dt
ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id)
SAMPLE BY cityHash64(user_id)
SETTINGS index_granularity = 8192;
Replicated — означает, что таблица реплицируемая, и это решает одно из наших требований к надёжности.
Replacing — таблица поддерживает дедупликацию по первичному ключу: по умолчанию первичный ключ совпадает с ключом сортировки, так что секция ORDER BY как раз и говорит о том, что такое первичный ключ.
SAMPLE BY — семплирование тоже хотелось попробовать: sample возвращает равномерно-псевдослучайную выборку.
index_granularity = 8192 — это магическое число строк данных между засечками индекса (да, он разреженный), которое используется по умолчанию. Мы его не меняли.
Партиционирование сделали по дням (хотя по умолчанию — по месяцам). Множество запросов к данным, как предполагалось, будут внутридневными — например, построить минутный график просмотров видео за определённый день.
Далее мы взяли кусок технических логов и наполнили таблицу примерно миллиардом строк. Прекрасное сжатие, группировки по колонкам типа Int*, подсчёты уникальных значений — всё работало невероятно быстро!
Говоря о скорости, имею в виду, что ни один запрос не длился дольше 500 мс, а большинство из них укладывались в 50–100 мс. И это на двух машинах — причём подсчётами, по сути, занималась только одна.
Мы посмотрели на это всё и представили, что вместо колонки UInt8 будет id страны, а колонку Int8 заменят данные, например, о возрасте пользователя. И поняли, что ClickHouse нам полностью подходит, если сделать всё правильно.
Строгая типизация данных
Польза от ClickHouse начинается ровно тогда, когда сформирована правильная схема данных. Пример: platform String — плохо, platform Int8 + словарь — хорошо, LowCardinality(String) — удобно и хорошо (о LowCardinality я расскажу чуть позже).
Мы создали специальный класс-генератор в php, который по запросу создаёт классы-обёртки над событиями на основании таблиц в ClickHouse, и единую точку входа в логирование. Поясню на примере схемы, которая получилась:
- Аналитик/дата-инженер/разработчик описывает документацию: какие поля, возможные значения, события необходимо логировать.
- Cоздаётся таблица в ClickHouse в соответствии со структурой данных из предыдущего пункта.
- Генерятся классы-обёртки для событий на основании таблицы.
- Продуктовой командой реализуется заполнение полей объекта этого класса, отправка.
Изменить схему на уровне php и тип логируемых данных не получится, не изменив прежде таблицу в ClickHouse. А это в свою очередь нельзя сделать без согласования с командой, изменения документации и описания событий.
Для каждого события можно выставить две настройки, которые регулируют процент отправляемых событий в ClickHouse и hadoop соответственно. Настройки нужны в первую очередь для постепенной раскатки с возможностью вырубить логирование, если что-то пойдёт не так. До hadoop данные доставляются по стандартной схеме с использованием Kafka. А в ClickHouse они летят через схему с KittenHouse в persistent-режиме, гарантирующем хотя бы однократную доставку события.
Событие доставляется в буферную таблицу на нужный шард, исходя из остатка от деления некоторого hash от user_id на количество шардов в кластере. Далее буферная таблица сбрасывает данные в локальную ReplicatedReplacingMergeTree. А поверх локальных таблиц натянута распределённая таблица с движком Distributed, которая позволяет обращаться к данным со всех шардов.
Денормализация
ClickHouse — столбцовая СУБД. Она не про нормальные формы, а значит лучше иметь всю информацию прямо в событии, чем join-ить. Join-ы тоже есть, но если правая таблица не помещается в память, начинается боль. Поэтому мы приняли волевое решение: вся интересующая нас информация должна храниться в самом событии. Например, пол, возраст пользователя, страна, город, день рождения — всё то, что является публичной информацией, которая может пригодиться для аналитики аудитории, а также все полезные сведения об объекте взаимодействия. Если, например, речь идёт о видео — это video_id, video_owner_id, дата загрузки видео, длина, качество в момент события, максимальное качество и так далее.
В общей сложности в каждой таблице у нас от 50 до 200 колонок, при этом во всех таблицах есть сервисные поля. Например, лог ошибки error_log — на самом деле ошибкой мы называем выход за диапазоны типа. На случай, если в поле с возрастом польются странные значения, выходящие за размеры типа.
Тип LowCardinality(T)
В ClickHouse есть возможность использовать внешние словари. Они хранятся в памяти, периодически обновляются, могут эффективно применяться в различных сценариях, в том числе как классические справочники. Например, вы хотите логировать операционную систему и у вас есть две альтернативы: строка или число + справочник. Само собой, на больших объёмах данных, да и для высокой производительности аналитических запросов, логично писать число, а строковое представление получать из словаря, когда понадобится:
dictGetString('os', 'os_name', toUInt64(os_id))
Но есть намного более удобный способ — использовать тип LowCardinality(String), который автоматически строит словарь. Производительность работы с LowCardinality при условии низкой кардинальности множества значений радикально выше, чем со String.
Мы, например, используем LowCardinality(String) для типов событий 'play', 'pause', 'rewind'. Или для platform: 'web', 'android', 'iphone':
SELECT
vk_platform,
count()
FROM t
WHERE dt = yesterday()
GROUP BY vk_platform
Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB
(13.65 billion rows/s., 41.04 GB/s.)
Фича пока экспериментальная, так что для её использования необходимо выполнять:
SET allow_experimental_low_cardinality_type = 1;
Но есть ощущение, что через какое-то время она окажется уже не под настройкой.
Агрегация данных ВКонтакте
Так как колонок много, а событий очень много, то естественное желание — подрезать «старые» партиции, но прежде — собирать агрегаты. Изредка бывает нужно проанализировать сырые события (месячной или годичной давности), поэтому мы не подрезаем данные в hdfs — любой аналитик может обратиться к нужному паркету за любую дату.
Как правило, при агрегации во временной интервал мы всегда упираемся в то, что количество строк на единицу времени равно произведению мощностей срезов. Это накладывает ограничения: страны начинают собирать в группы типа 'Россия', 'Азия', 'Европа', 'Остальной мир', а возрасты — в интервалы, чтобы понизить размерность до условного миллиона строк на дату.
Агрегация по dt, user_id
Но ведь у нас реактивный ClickHouse! Можем разогнаться до 50?100 млн строк на дату?
Быстрые тесты показали, что можем, и в этот момент возникла простая идея — оставить пользователя в агрегате. А именно — агрегировать не по «дата, срезы» средствами spark, а по «дата, пользователь» средствами ClickHouse, производя при этом некоторое «транспонирование» данных.
При таком подходе мы сохраняем пользователей в агрегированных данных, а значит по-прежнему можем считать аудиторные показатели, метрики удержания и частотности. Можем соединять агрегаты, считая общие аудитории нескольких сервисов вплоть до всей аудитории ВКонтакте. Всё это можно делать по любому срезу, который присутствует в таблице за условно то же время.
Проиллюстрирую на примере:
После агрегации (ещё много колонок справа):
При этом агрегация происходит именно по (dt, user_id). Для полей с информацией по пользователю при такой агрегации можно использовать функции any, anyHeavy (выбирает часто встречающееся значение). Можно, например, собрать в агрегат anyHeavy(platform), чтобы знать, с какой платформы в основном пользователь вызывает видеособытия. При желании можно использовать groupUniqArray(platform) и хранить массив всех платформ, с которых пользователь вызывал событие. Если и этого мало, можно завести отдельные колонки под платформы и хранить, например, количество уникальных видео, досмотренных до половины с конкретной платформы:
uniqCombinedIf(cityHash64(video_owner_id, video_id),
(platform = 'android') AND (event = '50p')) as uniq_videos_50p_android
При таком подходе получается достаточно широкий агрегат, в котором каждая строка является уникальным пользователем, а каждая колонка содержит информацию или по пользователю, или по его взаимодействию с сервисом.
Получается, чтобы посчитать DAU сервиса, достаточно выполнить такой запрос поверх его агрегата:
SELECT
dt,
count() as DAU
FROM agg
GROUP BY dt
Elapsed: 0.078 sec.
Или вычислить, сколько дней пользователи присутствовали в сервисе за неделю:
SELECT
days_in_service,
count() AS uniques
FROM
(
SELECT uniqUpTo(7)(dt) AS days_in_service
FROM agg2
WHERE dt > (yesterday() - 7)
GROUP BY user_id
)
GROUP BY days_in_service
ORDER BY days_in_service ASC
7 rows in set. Elapsed: 2.922 sec.
Можем ускорить семплированием, при этом почти не теряя в точности:
SELECT
days_in_service,
10 * count() AS uniques
FROM
(
SELECT uniqUpTo(7)(dt) AS days_in_service
FROM agg2
SAMPLE 1 / 10
WHERE dt > (yesterday() - 7)
GROUP BY user_id
)
GROUP BY days_in_service
ORDER BY days_in_service ASC
7 rows in set. Elapsed: 0.454 sec.
Стоит сразу отметить, что семплирование идёт не по проценту событий, а по проценту пользователей — и в результате становится невероятно мощным инструментом.
Или то же за 4 недели с семплированием 1/100 — примерно на 1% менее точные результаты получаются.
SELECT
days_in_service,
100 * count() AS uniques
FROM
(
SELECT uniqUpTo(7)(dt) AS days_in_service
FROM agg2
SAMPLE 1 / 100
WHERE dt > (yesterday() - 28)
GROUP BY user_id
)
GROUP BY days_in_service
ORDER BY days_in_service ASC
28 rows in set. Elapsed: 0.287 sec.
Агрегация с другой стороны
При агрегации по (dt, user_id) мы не теряем пользователя, не сильно упускаем информацию о его взаимодействии с сервисом, однако, несомненно, утрачиваем метрики о конкретном объекте взаимодействия. Но ведь можно и это не терять — давайте построим агрегат по
(dt, video_owner_id, video_id), придерживаясь тех же идей. Максимально сохраняем информацию о видео, не сильно упускаем данные о взаимодействии видео с пользователем и совсем пропускаем информацию о конкретном пользователе.
SELECT starts
FROM agg3
WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...)
1 rows in set. Elapsed: 0.030 sec
Или топ-10 видео по просмотрам за вчера:
SELECT
video_id,
video_owner_id,
watches
FROM video_agg_video_d1
WHERE dt = yesterday()
ORDER BY watches DESC
LIMIT 10
10 rows in set. Elapsed: 0.035 sec.
В итоге у нас есть схема агрегатов вида:
- агрегация по «дата, пользователь» в рамках продукта;
- агрегация по «дата, объект взаимодействия» в рамках продукта;
- иногда возникают и другие проекции.
Azkaban и TeamCity
Стоит напоследок сказать пару слов про инфраструктуру. Сбор агрегатов у нас запускается ночью, стартуя с OPTIMIZE над каждой из таблиц с сырыми данными, чтобы вызвать внеочередное слияние данных в ReplicatedReplacingMergeTree. Операция может длиться достаточно долго, однако она необходима для удаления дублей, если они возникли. Тут стоит отметить, что пока я ещё ни разу не сталкивался с дубликатами, но нет гарантий, что они не появятся в будущем.
Следующий шаг — создание агрегатов. Это bash-скрипты, в которых происходит следующее:
- сначала мы получаем количество шардов и какой-нибудь хост из шарда:
SELECT shard_num, any(host_name) AS host FROM system.clusters GROUP BY shard_num
- затем скрипт выполняет последовательно для каждого шарда (clickhouse-client -h $host) запрос вида (для агрегатов по пользователям):
INSERT INTO ... SELECT ... FROM ... SAMPLE 1/$shards_count OFFSET 1/$shard_num
Это не совсем оптимально и может порождать много сетевого взаимодействия между хостами. Однако при добавлении новых шардов всё продолжает работать из коробки, сохраняется локальность данных для агрегатов, поэтому мы решили не сильно из-за этого переживать.
В качестве планировщика заданий у нас выступает Azkaban. Я бы не сказал, что это суперудобный инструмент, но со своей задачей прекрасно справляется, в том числе когда речь идёт о выстраивании чуть более сложных pipeline-ов и когда одному скрипту нужно дождаться выполнения нескольких других.
Суммарное время, которое уходит на преобразование существующих сейчас событий в агрегаты, — 15 минут.
Тестирование
Каждое утро у нас запускаются автоматизированные тесты, которые отвечают на вопросы относительно сырых данных, а также готовности и качества агрегатов: «Проверь, что за вчера данных или уников по какому-нибудь срезу в сырых данных или в агрегатах не стало меньше более чем на полпроцента по сравнению с тем же днём неделю назад».
Технологически это обычные unit-тесты с использованием JUnit и реализации jdbc-драйвера к ClickHouse. Прогон всех тестов запускается в TeamCity и выполняется около 30 секунд в 1 поток, а в случае fail-ов мы получаем уведомления ВКонтакте от нашего замечательного TeamCity-бота.
Заключение
Используйте только стабильные версии ClickHouse, и ваши волосы будут мягкими и шелковистыми. Стоит ещё добавить, что ClickHouse не тормозит.
Комментарии (11)
Tachyon
27.03.2019 06:23Только я вижу на КДПВ трёх зомбяков?
для тех кому лень листать вверхxeofus
27.03.2019 15:19Событие доставляется в буферную таблицу на нужный шард, исходя из остатка от деления некоторого hash от user_id на количество шардов в кластере. Далее буферная таблица сбрасывает данные в локальную ReplicatedReplacingMergeTree
Расскажите поподробнее пожалуйста как это реализовано? Есть какой-то тип буферной таблицы и как он сбрасывает данные периодически?
В общей сложности в каждой таблице у нас от 50 до 200 колонок, при этом во всех таблицах есть сервисные поля. Например, лог ошибки error_log — на самом деле ошибкой мы называем выход за диапазоны типа. На случай, если в поле с возрастом польются странные значения, выходящие за размеры типа
Можно еще поподробнее про это, если не затруднит? Вы как то обрабатываете ошибки? Имеется в виду, как вы ошибку записываете в отдельное поле?FranciscoSuarez Автор
27.03.2019 15:39Расскажите поподробнее пожалуйста как это реализовано? Есть какой-то тип буферной таблицы и как он сбрасывает данные периодически?
Речь идёт о движке Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
Из документации:
Данные сбрасываются из буфера и записываются в таблицу назначения, если выполнены все min-условия или хотя бы одно max-условие. min_time, max_time — условие на время в секундах от момента первой записи в буфер; min_rows, max_rows — условие на количество строк в буфере; min_bytes, max_bytes — условие на количество байт в буфере.
Обычно мы используем с такими параметрами:
Buffer(database, table, 2, 10, 10, 10000000, 10000000, 100000000, 100000000);
Можно еще поподробнее про это, если не затруднит? Вы как то обрабатываете ошибки? Имеется в виду, как вы ошибку записываете в отдельное поле?
У нас буквально в таблице есть has_errors Int8, который служит для быстрого фильтра на предмет наличия ошибок и их количества. Также есть error_log String — строковое поле…
Когда на бэкенде формируется запрос на вставку, то происходит валидация полей на предмет
а) соответствия заданному типу
б) на то, что значение не выходит за размер типа.
В случае, если что-то не выполняется, в error_log пишется сообщение с указанием, что именно пошло не так, само значение заменяется на default значение для типа данных, а получившаяся строка пытается вставиться.
Пример, если я в поле amount Int64 по каким-то причинам буду пытаться с backend вставить строку, то сам ивент вставится, а я получу has_error = 1, error_log = 'amount not numeric', а в поле amount = 0 — дефолтное значение типа.
alexkrash
28.03.2019 17:29Отличная статья, спасибо!
Скажите пожалуйста, какое количество типов продуктовых событий вы ожидаете?
Наш опыт проектирования подобной штуки показал, что при наличии более 300 событий начинает сильно деградировать система репликации, zookeeper, и т.д.FranciscoSuarez Автор
28.03.2019 20:47У меня есть наброски,- где-то 50 условных продуктовых структур. Допустим, я ошибаюсь и будет ещё 50. Так что, скорее всего, уложимся в 100
BearLion
А я правильно понимаю, что у вас в итоговом агрегате для одного пользователя осталась только одна строка. Грубо говоря, это результат запроса
select user_id, dt, count(), avg(),…
from table
group by user_id, dt
FranciscoSuarez Автор
Всё верно, в агрегате по пользователям для конкретного сервиса: 1 строка на 1 пользователя на 1 дату.
dt, user_id,… cols_about_user ....,… metrics…
BearLion
И этого уровня достаточно для аналитики?
1) Ведь вы сразу потеряли хотя бы что-то близкое к real-time
2) Теряется возможность посчитать достаточно много метрик, например перцентили, воронки.
FranciscoSuarez Автор
Сырые данные остаются и хранятся достаточное время — от месяца до бесконечности в зависимости от кол-ва событий. Как правило хватает для решения всех задач.