Всем привет!

Всегда интересно узнавать, как устроено IT в различных сферах и компаниях: какие задачи ставятся и как находятся решения. Да и в целом, делиться опытом полезно!

Меня зовут Илья Панов, я инженер данных в X5 Tech и участвую в развитии продукта CVM (Customer Value Maximization). В этой статье хочу рассказать о том, как живёт и развивается IT-продукт в ритейле. Расскажу, откуда появилась потребность в инструменте, подобном ClickHouse, и как проходило RnD для его внедрения.

Что такое CVM?

Для понимания потребности в использовании ClickHouse необходимо сначала немного рассказать про проект: что он делает, как работает с данными и откуда их берёт. Буквально самый базовый минимум, чтобы погрузить в контекст.

Итак, Customer Value Maximization - сложное название, которое с маркетологского языка можно перевести примерно так: использование знаний о клиенте для увеличения финансовых показателей и общего дохода компании при соблюдении интересов и вовлечённости самого клиента. В компании принято клиентов (покупателей) называть гостями. Знание о гостях следует понимать наиболее широко. Это, во-первых, данные, лежащие на поверхности - их непосредственно сообщает или генерит сам гость: анкетные и чековые данные. А, во-вторых, это знания о поведении и предпочтениях каждого гостя:

  • какие маркетинговые предложения и их механики лучше работают;

  • какие каналы коммуникации наиболее эффективны и многое другое.

Все эти знания помогают маркетологам формировать и запускать кампании для определённого сегмента гостей. А также, и это крайне важно, оценивать эффекты после запуска, что добавляет ещё больше знания. Получается такая сложная система обучения с подкреплением. При этом гость уже вовлечён в программу лояльности и начинает принимать участие в маркетинговой кампании в тот момент, когда по некоторому каналу связи (например, через пуш-уведомление в приложении) получает сообщение с оффером: "Привет, {guestName}! Вы любите сладости, поэтому специальное предложение только для вас: купите 2 шоколадки с увеличенным кэшбэком."

Собственно, для построения подобной системы в CVM создаётся и развивается программная платформа (Платформа CVM), позволяющая конечному пользователю (маркетологу) использовать и увеличивать знания о клиентах. Надеюсь, в обозримом будущем коллеги расскажут о платформе CVM и её инженерных и аналитических особенностях в отдельной статье, а я перейду ближе к данным.

Взаимодействие CVM с внешним миром
Взаимодействие CVM с внешним миром

Корпоративные данные и где они обитают

Как, наверное, и в любой другой большой компании, данные X5 хранятся в Data Warehouse. Для массового доступа эти данные подливаются в озеро. Озеро данных в X5 реализовано на основе кластера Hadoop и множества витрин в Hive. Таким образом, любой продукт, в том числе и CVM, для работы с данными ходит в корпоративный Hadoop, где совершает расчёты, хранит свои витрины и забирает нужные данные в другие системы хранения.

Достаточно естественный подход: толпы разных юзеров из множества продуктов (аналитики, дата инженеры и саентисты) используют общедоступные данные и что-то считают с помощью Spark.

Типичный процесс обработки данных в Hadoop с использованием Spark
Типичный процесс обработки данных в Hadoop с использованием Spark

Витрина сегментатора

В самом сердце, мне так хочется верить, платформы CVM работает сервис "Сегментатор". Он позволяет пользователю выбирать сегмент гостей на основе более 200 признаков. Признаки хранятся в витрине cегментатора, которая каждый день обновляется. В этой витрине есть как анкетные признаки:

  • возраст / пол и т. д.;

  • разрешения на коммуникации;

так и признаки из чеков, а также рассчитанные на их основе, например:

  • средний РТО - розничный товарооборот - за 15/30/90/… дней;

  • количество покупок за 15/30/90/… дней;

  • любимый магазин / город / регион.

Каждый признак (или группа признаков) рассчитывается отдельным методом, а затем все данные объединяются в витрину. Подход, который позволяет за приемлемое время выполнять расчёт витрины, описан в моей прошлой статье: Параллельные вычисления в Apache Spark.

Фрагмент витрины сегментатора
Фрагмент витрины сегментатора

Гибкие фильтры

Понятно, что признаки вида "средний чек за 15/30/90/… дней" - это совсем не гибко! Пользователям среднего РТО за 30 дней не достаточно - кому-то может потребоваться РТО за 32 дня. Или 63 дня. А ещё было бы полезно выбирать гостей из динамического рейтинга их покупок: нужен сегмент гостей, у которых молоко и фрукты входят в топ 3 покупок за 23 дня.

Очевидно, что для решения подобной задачи необходим инструмент, назовём его "Движок гибких фильтров", который позволит создавать методы-признаки, для которых невозможен предрасчёт в витрине.

Можно сформулировать основные требования к движку гибких фильтров:

  • доступ к исходным чековым данным: миллиарды записей / сотни Гб;

  • выполнение агрегаций sum/count/...;

  • скорость выполнения – не более 5-10 минут.

Кажется, что всё очень просто:

  • берём Spark и пишем под него методы;

  • для некоторых методов, возможно, понадобится некоторый предрасчёт;

  • оборачиваем всё в сервис;

  • done!

Но уже на первых не самых сложных методах Spark показывал не очень быстрый результат, а порой неприлично долгий.

Из инструментов, решающих схожий класс задач, известен ClickHouse. Однако, экспертизы в его использовании (как по devOps'ой части, так и по дата инженерной) на момент разработки гибких фильтров не было. Во многих статьях про Клик говорится, как он хорош для быстрой агрегации вагонов данных для различных отчётов и статистик. Но совершенно неочевидно, как он поведёт себя в роли инструмента для получения списков гостей из методов со сложной логикой.

Проведение RnD: Spark vs ClickHouse

Если что-то неочевидно, то нужно провести RnD и во всём разобраться! И, конечно же, нужно сформулировать цели, которые необходимо достичь в ходе исследования:

  1. Проверить пригодность ClickHouse для указанной задачи на реальных данных.

  2. Получить экспертизу в работе с ClickHouse. Лишним не будет.

  3. Сравнить производительность решения в ClickHouse с аналогичным на Spark.

Первые две цели - разобраться с кликом, решая задачу, - с инженерной точки зрения естественны и понятны. Но как сравнить два инструмента, которые на первый взгляд вообще из разных вселенных? Spark - максимально гибкий, фактически промышленный стандарт в области больших данных, ему доступны ресурсы внушительного кластера.

ClickHouse... Ну, во-первых, его ещё даже нет - т.е. его необходимо сначала где-то развернуть. Из представления объёмов данных, разворачивать клик следует сразу в виде кластера. Каковы должны быть размеры этого кластера, чтобы сравнивать со Спарком? Много неопределённости и неизвестных.

Для начала было принято решение взять самый сложный гибкий метод. Для этого метода подготовить данные до такого состояния, чтобы можно было сказать: "Всё, что только возможно, вынесено в предрасчёт." Таким образом, для сравнения осталась процедура, которую можно одинаково воспроизвести в Спарке и Клике.

А вот и сам метод: "Необходимо выбрать гостей, у которых в топ N рейтинга категорий покупок за D дней до их последней покупки входят категории из списка L". Да, сложно скомпилировать в голове, что же нужно сделать, поэтому опишу метод в псевдокоде на человеческом языке:

def get_guests_by_rate(
  	топ N: int,
  	D дней: int
  	список категорий: List[str] ) -> List[guest_id]:

  * Для всех гостей

  1. Получить дату последней покупки
  2. Выбрать все покупки за D дней до последней
  3. Для каждой категории товара посчитать суммарное РТО
  4. Построить рейтинг категорий по сумме РТО
  5. Выбрать гостей, у которых в топ N входят заданные категории

Таким образом, для проведения теста чековые данные можно привести к такому виду, где уже выполнен предрасчёт - для каждого гостя и чека определён следующий параметр:

  • для каждого чека подсчитана разница для дат - date_diff - разница между датой последней покупки и датой чека. Это позволяет для параметра D сразу выбрать только необходимые данные: where date_diff <= D

Т.е. тестовый метод сводится к такой задаче:

  1. выбрать транзакции по заданной глубине поиска (D);

  2. построить рейтинги категорий для каждого гостя;

  3. выбрать гостей, у которых в топ N рейтинга есть запрошенные категории.

Фрагмент широкой таблицы транзакций в ClickHouse
Фрагмент широкой таблицы транзакций в ClickHouse

Построение рейтингов и простой фильтр в выборке - это задача достаточно простая и для Спарка, и для Клика. Осталось только проверить, кто быстрее, а может и вовсе они равны.

Тестовый стенд ClickHouse

Только вот чтобы проверить кто быстрее, сначала нужно подготовить второго участника. Вместе с коллегой devOps'ом мы решили развернуть мини кластер ClickHouse, состоящий из трёх нод, в корпоративном облаке. Т. е. да, Клик был запущен в виртуальных машинах, которые неизвестно с кем соседствовали. Для первого подхода никакой репликации включено не было, только разделение данных на три шарда, по количеству нод.

Было опасение, что загрузка данных в Клик потребует каких-то особых манипуляций: что-то вроде перекладывания данных куда-нибудь, чтобы потом специальными утилитами их вгружать. Но нет, оказалось - можно совершенно легко загружать данные непосредственно из Спарка. Для ClickHouse существует JDBC-драйвер (и даже не один). И загрузка выглядит примерно так:

headers = spark.table("headers")
items = spark.table("items")

transactions = headers.join(items)

(transactions
  .write.jdbc(
	"jdbc:clickhouse://host:port",
	table="transactions"
  )
)

Чековые данные хранятся не в единой витрине, а разделены на две большие примерно так:

  • в Headers хранится то, что в чеке записано в области "Итого" - общая информация о покупке;

  • в Items сохранены отдельные позиции из всего списка чека.

В Клик загружается, так сказать, широкая таблица с транзакциями. Копия этих данных сохранена в HDFS для Спарка.

Выполнение запросов к Клику легко реализуется через клиент, версии которого существуют для множества языков. В частности, для Python доступ к данным выглядит примерно так:

from clickhouse_driver import Client

click = Client(credentials)

SQL = """
	SELECT
  	guest_id,
  	sum(rto) as sum_rto
	FROM transactions
	GROUP BY guest_id
	<...>
	WHERE sum_rto > 1000
"""

features: pd.DataFrame = click.query_dataframe(SQL)

Забегая вперёд, скажу, что ClickHouse впечатлил, поэтому для более глубокого теста были добавлены ещё две виртуальные машины, чтобы включить репликацию и увеличить объём данных. Для исследования репликация была настроена по циклическому принципу: каждая нода хостит не только реплику "своего" шарда, но и реплику своего "соседа". Так получается, что каждая нода обслуживает две разные реплики. Конечно, для прода так делать не стоит, но для RnD - вполне годится: можно и нагрузку потестить, и отказоустойчивость.

Схема тестового стенда
Схема тестового стенда

Результаты RnD

Итак, эксперимент. Интрига, если она вообще была, уже нарушена - Клик быстрее. Во всех тестируемых методах ClickHouse показал заметное превосходство. На простых методах это превосходство оказывалось в разы.

Что же касается метода, который был главным в тестировании, то вот SQL скрипт для его реализации:

WITH ranks AS(
  SELECT
	guest_id,
	sum(item_rto) AS sum_rto,
	item_category
  RANK(sum_rto) OVER w AS category_rank
  FROM transactions
  WHERE date_diff <= {days_from_last_purch}
  GROUP BY guest_id, item_category
  WINDOW w AS (PARTITION BY guest_id ORDER BY sum_rto DESC)
)

SELECT DISTINCT guest_id  
FROM ranks
WHERE category_rank <= {top}
  AND item_category IN {requested_categories}

А вот график, на котором наглядно изображено сравнение двух инструментов. На графике время выполнения - это медиана для множества измерений времени выполнения метода при фиксированной глубине поиска (30 дней, 90, 180 и 365).

Из графика видно, что время выполнения в Спарке уходит в космос при увеличении глубины поиска, т. е. увеличении объёма данных, необходимых в расчёте.

Для кого-то может быть очевидным, что Клик быстрее в задачах подобного класса, где необходимо выполнить некоторые функции-агрегации. Но всё же почему Spark показал такие, очевидно, плохие результаты и оказался непригодным как движок гибких фильтров? А, может, вовсе не Spark виноват?

Почему Spark медленный

Попробуем посмотреть, как долго происходит чтение. Без каких-либо вычислений, только чтение данных.

df = (spark.table("Таблица с чеками")
  .filter(F.col("дата чека") >= "2022-01-01")
  .filter(F.col("дата чека") < "2022-04-01")
  .select(["trx_id"])
)

df.count()

Выполняем count записей за три месяца и видим такой результат в веб-интерфейсе Spark'а:

Достаточно долго. Но и количество партиций/файлов, которые пришлось прочитать, тоже велико - более полутора тысяч. Посмотрим, где и как выполнялись task'и для этого count'а:

Видим, что локальность данных (Data Locality) не максимальная, т. е. процессу, в котором Spark-worker выполняет задачу, необходимо читать данные, физически расположенные не на той же ноде, а где-то в кластере HDFS. Да, конечно, в кластере есть конкуренция за ресурсы, что тоже сказывается на времени выполнения.

И действительно, если так же проанализировать выполнение гибкого метода, то для глубины поиска в 30 дней из 6 минут выполнения 4 минуты приходится на чтение данных. А сами вычисления занимают примерно 2 минуты, что предельно близко к результату в ClickHouse.

Таким образом, из-за инженерных особенностей большого Hadoop кластера чтение данных может занимать достаточно продолжительное время. Для подавляющего числа задач это совсем не является критичным. Но для построения сервиса, для которого время выполнения важно, это непригодно. И, видимо, в этом оказывается главное преимущество ClickHouse - данные шарда хранятся локально на той же самой ноде, куда приходит фрагмент распределённого SQL-запроса.

Заключение

К настоящему времени, с момента начала RnD прошло более года. За это время ClickHouse перебрался в продовую среду, в более просторное облако: теперь реплики с разных шардов не ютятся на одной ноде. Коллеги пополняют сервис гибких фильтров новыми методами, а пользователи платформы CVM активно используют эти методы для запуска кампаний. Более 90% гибких методов выполняются менее минуты.

Стоит сказать несколько слов про сам Клик. Его использование требует некоторой подготовки. SQL-язык совсем немного роднит его с привычными реляционными СУБД. Но ClickHouse - это распределённый инструмент больших данных, поэтому всегда следует держать в голове ещё одно измерение - не только корректную логику SQL-скрипта, но и как его выполнение будет распределяться в кластере, и, к тому же, будет ли вообще распределяться. Время выполнения двух SQL-скриптов с одинаковой логикой может катастрофически отличаться, если не учесть дополнительное измерение - распределение данных в кластере.

Но всё же он действительно хорош для класса задач, для которого создан. Кто бы сомневался ;)

Комментарии (2)


  1. PaulIsh
    26.06.2023 07:02

    Не совсем по теме, но по примеру. Иногда покупатели разбивают одну покупку на несколько чеков (оплата с разных карт, повторный заход в магазин, так как что-то забыли, ребенок на кассе что-то тянет к покупке, а вы уже в процессе оплаты). Полагаю, такие чеки можно объединять в один виртуальный.


    1. ilya-panov Автор
      26.06.2023 07:02

      Технически, конечно, можно объединять чеки с заданной дельтой между датами покупки. Но пока ниразу не слышал о задаче, где были востребованы такие "допокупки".