Алгоритм k-means хорошо известен и применяется когда надо быстро разделить массив данных на группы или т.н. "кластеры". Предполагается, что каждый элемент данных имеет набор численных метрик, и мы можем говорить как о позиции точки в некотором многомерном пространстве, так и о их взаимной близости.

k-means относится к категории EM-алогоритмов (Expectanion-Maximization), где мы попеременно определяем насколько правильно текущее разбиение точек на кластеры, а затем немного его улучшаем.

Этот достаточно простой алгоритм, был сформулирован ещё в 1950-х, и с тех пор реализован на самых разных языках программирования. Есть реализации для MySQL и Postgress, и даже для Excel.

Зачем Clickhouse

Есть сотни и тысячи реализаций k-means. Почему бы не взять какую-нибудь популярную python библиотеку, вытащить данные через драйвер базы данных, и обработать их в памяти питоновской программы?

Можно и так, если данных мало, и задача найти какую-нибудь особенную область на изображении, чтобы применить к ней более сложный алгоритм. Но бывает, что данных много. Или даже ОЧЕНЬ МНОГО. Да, Clickhouse - это про Big Data.

Big Data - это когда данных настолько много, что они никуда не помещаются. Не только в RAM - об этом даже не надо мечтать. Часто весь массив данных не помещаются даже на один сервер, и приходится ставить несколько (шардировать). Просто написать на sklearn.cluster.KMeans - это прочитать 1Tb данных куда-то в память питоновского процесса, а потом ещё и ещё раз, причем в 1 поток. Не самая лучшая идея. Поэтому на больших объемах данных применяют решения вида MapReduce, Apache AirFlow, Spark и прочие дорогие и солидные инструменты.

Я попробую сделать что-то сравнимое, только просто и дешево, пользуясь возможностями Сlickhouse. И чтобы не тормозило. Покажу как как можно эффективно работать с большими данными на примере одного простого алгоритма. Ведь Сlickhouse - это полноценная среда выполнения заданий по обработке, а не простое хранилище индексированных данных.

Основная сложность в этой задаче заключается в неоднократном чтении огромного массива данных с последующим вычислением евклидова расстояния и группировкой результатов на каждом шаге. В Clickhouse предусмотрено все, чтобы именно эти операции работали быстро и прозрачно для разработчика:

  • данные лежат в виде колонок - читаем только то, что нам нужно

  • данные лежат в компрессированном виде (lz4) - читаем в разы меньше, снижаем нагрузку на диск

  • данные читаются в 8 потоков (по умолчанию, конфигурируется) из отдельных файлов. Без каких-то усилий разработчика задействуется многоядерная архитектура современных серверов.

  • тип Tuple эффективно хранит точки многомерного пространства

  • специальная функция быстро считает евклидово расстояние

  • SQL код компилируется (LLVM) и работает со скоростью C++ кода

  • операция group by имеет более 40 различных оптимизаций и работает очень эффективно. Если группировка в памяти невозможна, то автоматически будет задействован диск.

  • если данные шардированны по нескольким серверам, то запрос будет отослан на все шарды, частичная группировка будет сделана распределенно, а результаты объединены. Без каких-то усилий со стороны разработчика.

Исходные данные

Данные могут находиться в любой таблице, а чтобы алгоритм имел к ним стандартный интерфейс предлагается подготовить обычное SQL View:

create or replace view YH as
select i, (toInt32(x),toInt32(y)) as Y
from sourceData;

Данные выбираются из таблицы sourceData, формируется две колонки - первичный ключ и Tuple из необходимого количества числовых координат. В данном случае используется две целочисленные координаты 2D пространства. Если данные шардированы, то такое view нужно создать на всех узлах кластера

Хранение центроидов

Centroids - ключевое понятие алгоритма k-means. Это центры кластеров, которые мы ищем и уточняем последовательными приближениями. Будем хранить в табличке.

create table WCR (
   ts DateTime,
   j Int32,
   C Tuple(Int32,Int32)
) engine = MergeTree
order by ts;

Храним все итерации, но так как у нас нет автоинкремента, то наш выбор - простой таймстемп. Номер кластера, и координаты центроида в 2D пространстве. Сортировка по времени.

Начальная инициализация

В простейшем случае центроиды инициализируются координатами случайно выбранных точек исходного набора данных. Сколько хотите кластеров - столько центроидов и нужно проинициализировать. Алгоритм очень уязвим к начальному расположению центроидов - может и не сойтись.

Поэтому в более сложном варианте инициализации (k-means++) центроиды инициализируются хоть и вероятностно, но уже с учетом расстояния между точками в пространстве. Утверждается, что так лучше. Так и будем делать.

Первый центроид выбираем случайным образом (тут число 40) :

insert into WCR select now(), 1, Y
from YH
limit 40,1; 

Последующие центроиды:

insert into WCR
select now(), (select j from WCR order by ts desc limit 1)+1 as j, y
from (
	select y,
	sum(d) over () as total,
	sum(d) over (rows between unbounded preceding and current row ) as cum
	from (
		select argMin(Y, L2Distance(Y,C) as dx2) as y, min(dx2) as d
		from YH
		cross join (select * from WCR order by ts desc limit 1 by j) as WCR
		where Y not in (select C from WCR)
		group by Y
	)
)
where total * (select rand32()/4294967295) < cum
order by cum
limit 1;

Этот insert запускается несколько раз, пока не наберется нужное количество центроидов.

Пересчет центроидов

  • вычисляем расстояние от каждой точки до всех центроидов

  • для каждой точки находим ближайший центроид, объединяем точки в группы

  • назначаем центр этой группы новой позицией центроида

  • повторяем пока движение центроидов не остановится (почти)

INSERT INTO WCR SELECT
now(),  j,
tuple(COLUMNS('tupleElement') APPLY avg) AS C
FROM (WITH ( SELECT  groupArray(j), groupArray(C)
			FROM (select j,C from WCR order by ts desc limit 1 by j) ) AS jC
			SELECT  untuple(Y), i,
			arraySort((j, C) -> L2Distance(C, Y), jC.1, jC.2)[1] AS j
			FROM YH
		 )
GROUP BY j
  • берем центроиды прошлого шага: select j,C from WCR order by ts desc limit 1 by j

  • делаем из них кортеж (tuple) с двумя массивами внутри (номер и позиция)

  • передаем эти массивы функции arraySort, которая для каждой точки сортирует центроиды по возрастанию расстояния от точки до центроида. Берем только первый элемент массива - номер ближайшего к точке центроида.

  • untuple(Y) развертывает координаты точки в отдельные столбцы. Их имена будут вида tupleElement(Y.1)

  • COLUMNS('tupleElement') - берет все столбцы по указанному regex

  • group by j - группируем все точки по номеру центроида

  • APPLY avg - применяет к выбранным столбцам аггрегатную функцию - берет среднее по каждой координате, вычисляя новую позицию центроида.

  • tuple() - полученный список отдельных (но уже аггрегированных) столбцов снова сворачивается в единый кортеж.

  • insert into - записываем новые центроиды и таймстемп в таблицу.

Результат

В таблице WCR сохраняются все шаги работы алгоритма. Есть таймстемп для каждого шага, есть позиция центроида. Чтобы вытащить необходимые для визуализации данные можно написать запрос, похожий на обычную итерацию, который вычислит принадлежность каждой точки к кластеру:

with tuple(COLUMNS('tupleElement')) as a
select a.1 as x,
		   if(j=1,a.2,null) as p1,
			 if(j=2,a.2,null) as p2,
			 if(j=3,a.2,null) as p3,
			 if(j=4,a.2,null) as p4,
			 if(j=5,a.2,null) as p5
from ( WITH ( SELECT  groupArray(j), groupArray(C)
			 FROM (select j,C from WCR order by ts desc limit 1 by j) ) AS jC
			 SELECT  untuple(Y), i,
			 arraySort((j, C) -> L2Distance(C, Y), jC.1, jC.2)[1] AS j
			 FROM YH
		 )

Заключение

Сам алгоритм работает быстро, с обычной для k-means точностью - трудные для человека ситуации (как на КДПВ) решает логично, а на явно разделенных группах точек - путается. Существуют алгоритмы и поновее, поинтереснее.

Целью статьи было показать как применять некоторые инструменты Сlickhouse для относительно сложной работы с большими данными.

Все описанные выше SQL запросы собраны в bash скрипт, который последовательно инициализирует центроиды, уточняет в цикле, и завершается при остановке блужданий. При желании можно переписать на любой удобный ЯП с учетом особенностей задачи и конкретной архитектуры хранения данных.

Проект доступен на Github

Идея реализации k-means на SQL была взята из  статьи сотрудников Terradata - Programming the K-means Clustering Algorithm in SQL Однако там всё написано на классическом SQL. Получилось очень сложно, с дополнительными генераторами SQL кода. Я попробовал сделать проще и лаконичнее, используя специфичный диалект SQL и функции Сlickhouse.

Имена переменных в SQL запросах в общих чертах соответствуют именам в статье. Там предложена интересная схема наименований, я в какой-то степени ей пытался соответствовать. Полезно, если вы хотите понять как работает алгоритм и его реализации.

Комментарии (3)


  1. li_bao
    13.01.2022 05:55
    +1

    Спасибо, отличная статья!


  1. konnectr
    13.01.2022 11:33

    Статья классная. Но складывается ощущение, что это быть как функция в кликхаусе


    1. btyshkevich Автор
      13.01.2022 11:38
      +1

      Процесс схождения EM любого алгоритма итеративный, а все функции должны выдавать результат сразу. Так что целиком алгоритм не получится. Да и не надо. Хорошо, что есть все составляющие (типа E2Distance) - и ядро алгоритма уложилось в 10 строк.

      Такая же ситуация с полнотекстовым поиском - есть все компоненты (токенизатор, стемминг и проч), но нет самого обратного индекса. Но его можно сделать в виде таблички.