Авторы статьи: Данила Перепечин DanilaPerepechin, Дмитрий Чеклов dcheklov.

Здравствуйте.
Data management platform (DMP) — это наша любимая тема во всей истории про онлайн рекламу. RTB is all about the data.
В продолжение цикла рассказов о технологическом стеке Targetix (SSP, DSP), сегодня я опишу один из инструментов, входящих
в DMP — Keyword Builder.




Микросегментирование


Keyword Builder помогает рекламодателям создавать очень узкие аудиторные сегменты (их еще называют микросегментами). Для построения таких аудиторий мы решили использовать следующий механизм: рекламодатель задает список ключевых слов, а на выходе получает аудиторию, состояющую из тех пользователей, которые посещали страницы с указанными ключевыми словами или искали эти слова в поиске. С одной стороны это очень простой инструмент, с другой — он дает маркетологам довольно большие возможности для экспериментов.

Основное преимущество этого инструмента — полный контроль над созданием аудитории. Рекламодатель четко понимает, какие пользователи в итоге увидят рекламу. Например, можно создать аудиторию на основе такого списка ключевых слов: ford focus, opel astra, toyota corola.

Как это выглядит со стороны рекламодателя:



В первую очередь, для решения этой задачи мы от всех возможных поставщиков данных (Raw Data Suppliers) получаем clickstream (история посещения страниц пользователем). Данные приходят к нам в таком виде:

{ user_id; url }


Цели и требования


Основное требование, которое мы предъявляли к инструменту — скорость создания аудитории. Этот процесс не должен занимать больше 5 минут даже для самых высокочастотных слов. Также важно, чтобы рекламодатель в интерфейсе при указании ключевого слова мог оценить размер аудитории. Оценка размера должна происходить в реальном времени при вводе слов (не более 100 мс, как видно на видео выше).

Для лучшего понимания приведём полный список требований, предъявляемых к инструменту:
  • принцип локальности данных;
  • высокая доступность базы данных (high availability);
  • горизонтальная масштабируемость;
  • высокая скорость записи;
  • высокая скорость полнотекстового поиска;
  • высокая скорость поиска по ключу.

Правда к этим требованиям мы пришли уже после создания первой версии этого инструмента :)


Первая архитектура, которая учит, «как не надо делать»


Вначале использовали MongoDB и всё шло довольно неплохо.



В коллекцию Visitor History записывали данные о пользователе, в Page Index — о страницах. URL страницы сам по себе ценности не представляет — страницу надо скачать и извлечь ключевые слова. Тут возникла первая проблема. Дело в том, что коллекции Page Compiled сначала не было, а ключевые слова записывались в Page Index, но одновременная запись ключевых слов и данных от поставщиков создавала слишком высокую нагрузку на эту коллекцию. Поле Keywords обычно большое, по нему необходим индекс, а в MongoDB той поры (версия 2.6) существовал lock на коллекцию целиком при операции записи. В общем, пришлось вынести ключевые слова в отдельную коллекцию Page Compiled. Пришлось — ну и что же, проблема решена — мы рады. Сейчас уже трудно вспомнить количество и характеристики серверов… что-то порядка 50 shard'ов.



Для создания аудитории по ключевым словам мы делали запрос в коллекцию Page Compiled, получали список URL'ов, на которых встречались эти слова, с этим списком шли в коллекцию Visitor History и искали пользователей, которые посещали эти страницы. Работало всё хорошо (сарказм) и мы могли создать 5, а то и 10(!!!) аудиторных сегментов в сутки… если, конечно, ничего не упадёт. Нагрузка в то время была около 800 млн. datapoint'ов в сутки, TTL индекс — 2 недели: 800*14… данных было много. Работа спорилась и за 3 месяца нагрузка возросла вдвое. Но брать ещё N серверов для поддержания жизни этой странной конструкции было не комильфо.

Самый большой и очевидный минус этой архитектуры — вышеописанный запрос, связанный с получением списка URL. Результатом этого запроса могли быть тысячи, миллионы записей. А главное, по этому списку нужно было сделать запрос в другую коллекцию.

Прочие минусы архитектуры:
  • неудобная запись из-за глобального lock'а;
  • огромный размер индекса, который разрастался, несмотря на удаление данных;
  • сложность мониторинга большого кластера;
  • необходимость самостоятельно создавать упрощённую версию полнотекстового поиска;
  • отсутствие принципа локальности при обработке данных;
  • невозможность быстрой оценки размера аудитории;
  • невозможность добавления пользователей в аудиторию на лету.

Плюсы:
  • мы поняли все минусы.


К чему мы пришли


Вообще, с самого начала было понятно, что что-то пошло не так, но теперь это стало очевидно. Мы пришли к выводу, что необходима таблица, которая будет обновляться в режиме реального времени и содержать записи следующего вида:

user { 

    user_id; keywords[] 

}

Проанализировав возможные решения, мы выбрали не на одну базу данных, а их комбинацию с небольшим дублированием данных. Сделали мы это потому, что издержки на дублирование данных были не сопоставимы с тем, какие возможности нам давала эта архитектура.



Первое решение — это платформа полнотекстового поиска Solr. Позволяет создавать распределенный индекс документов. Solr — популярное решение, имеющее большое количество документации и поддерживаемое в сервисе Cloudera. Однако работать с ним как с полноценной БД не получилось, мы решили добавить в архитектуру распределённую колоночную БД HBase.

В начале решили, что в Solr в качестве документа будет выступать user, в котором будет индексируемое поле keywords со всеми ключевыми словами. Но так как мы планировали удалять старые данные из таблицы, то решили в качестве документа использовать связку user+date, что стало полем User_id: то есть каждый документ должен хранить все ключевые слова пользователя за день. Такой подход позволяет удалять старые записи по TTL-индексу, а также строить аудитории с разной степенью «свежести». Поле Real_Id — настоящий id пользователя. Это поле используется для агрегации в запросах с указанной длительностью интереса.

Кстати, чтобы не хранить лишние данные в Solr, поле keywords сделали только индексируемым, что позволило нам существенно сократить объем хранимой информации. При этом сами ключевые слова, как вы уже поняли, можно достать из HBase.

Особенности Solr, которые нам пригодились в данной архитектуре:
  • быстрая индексация;
  • компактный индекс;
  • полнотекстовый поиск;
  • оценка количества документов, соответствующих запросу;
  • оптимизация индекса после удаления данных (предотвращает его разрастание, как это часто бывает в других БД).

Данные, дублирующиеся в HBase и в Solr, приложения записывают только в HBase, откуда сервис от Cloudera автоматически дублирует записи в Solr с указанными атрибутами полей и TTL индексом.

Таким образом мы смогли сократить издержки, связанные с тяжёлыми запросами. Но это не все элементы, которые оказались нам необходимы и стали нашим фундаментом. В первую очередь мы нуждались в обработке больших объёмов данных на лету и тут как нельзя кстати пригодился Apache Spark с его streaming-функционалом. А в качестве очереди данных мы выбрали Apache Kafka, которая как нельзя лучше подошла для этой роли.

Теперь рабочая схема выглядит следующим образом:



1. Из Kafka данные выбирают два процесса. Функционал очереди позволяет читать её нескольким независимым процессам, каждый из которых имеет свой курсор.

2. PageIndexer — из записи {user_id: [urls]} использует только URL. Работает только с таблицей Page Compiled.
  • При поступлении datapoint'а, делает запрос в HBase для проверки, скачивалась ли уже страница или нет
  • Если страница не была обнаружена в базе, экстрагирует с неё ключевые слова
  • Записывает данные в HBase.

3. VisitorActionReceiver —Собирает сохраненную в кафке информацию о действиях пользователей за 30 секунд (batch интервал).
  • Удаление дубликатов из очереди
  • Создание записи вида {user_id: [urls]}
  • Запрос в HBase в таблицу Page Compiled и получение ключевых слов, создание записи вида {user_id: [keywords]}
  • Запрос в HBase в таблицу Visitor Keywords и получение уже содержащихся у пользователя ключевых слов
  • Объединение ключевых слов для пользователя
  • Запись в Solr и HBase

4. SegmentBuilder строит аудитории и добавляет в уже существующие новых пользователей, в реальном времени.
  • По таймеру проверяет наличие необработанных аудиторий.
  • Формирует запрос в Solr и получает пользователей для аудитории.
  • Записывает данные в Aerospike


Особенностью PageIndexer является выбор ключевых слов со страницы, для разных типов страниц — разные наборы слов.
Работает на 1-2 серверах 32GB RAM Xeon E5-2620, скачивает 15-30К страниц в минуту. При этом выбирая из очереди 200-400К записей.
А основным достоинством VisitorActionReceiver, то что помимо добавления записей в Solr/HBase, данные так же пересылаются в Segment Builder и новые пользователи добавляются в аудитории в реальном времени.
Порядок вызовов VisitorActionReceiver:
public static void main(String[] args) {    
    SparkConf conf = getSparkConf();
    JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(STREAMING_BATCH_SIZE));
    JavaPairInputDStream<String, byte[]> rawStream = Utils.createDirectStream(jssc, KAFKA_TOPIC_NAME, KAFKA_BROKERS);
    JavaPairDStream<String, String> pairUrlVid = rawStream.mapPartitionsToPair(rawMessageIterator -> convertRawMessage(rawMessageIterator));
    Utils.GetCountInStreamRDD(pairUrlVid, "before_distinct");
    JavaPairDStream<String,String> reducedUrlVid = pairUrlVid.reduceByKey((old_s, next_s) -> reduceByUrl(old_s, next_s), LEVEL_PARALLELISM).repartition(LEVEL_PARALLELISM).persist(StorageLevel.MEMORY_ONLY_SER());
    Utils.GetCountInStreamRDD(reducedUrlVid, "after_distinct");
    JavaDStream<SolrInputDocument> solrDocumentsStream = reducedUrlVid.mapPartitions(reducedMessages -> getKeywordsFromHbase(reducedMessages)).repartition(SOLR_SHARD_COUNT).persist(StorageLevel.MEMORY_ONLY_SER());
    Utils.GetCountInStreamRDD(solrDocumentsStream, "hbase");
    SolrSupport.indexDStreamOfDocs(SOLR_ZK_CONNECT, SOLR_COLLECTION_NAME, SOLR_BATCH_SIZE, solrDocumentsStream);
    jssc.start();
    jssc.awaitTermination();
}


Segment Builder формирует в Solr сложный запрос, где словам с разных тегов присваивается различный вес, а также учитывается длительность интереса пользователя (краткосрочный, среднесрочный, долгосрочный). Все запросы на построение аудитории делаются при помощи edismax query, которая возвращает вес каждого документа, относительно запроса. Таким образом в аудитории попадают действительно релевантные пользователи.
В HBase в секунду приходит около 20К запросов на чтение и 2.5К на запись. Объём хранения данных — ~100 ГБ. Solr занимает ~250 ГБ, содержит ~250 млн записей TTL индекс 7 дней. (Все цифры без учёта репликации).

Коротко напомним основные элементы инфраструктуры:
Kafka — умная и устойчивая очередь, HBase — быстрая колоночная БД, Solr — давно зарекомендовавший себя поисковый движок, Spark — распределённые вычисления, включая streaming, а главное, всё это находится на HDFS, прекрасно масштабируется, мониторится и очень устойчиво. Работает в окружении Cloudera.
Яркая иллюстрация устойчивости




Заключение


Может показаться, что мы усложнили рабочую схему большим разнообразием инструментов. На самом деле мы пошли по пути наибольшего упрощения. Да, одну монгу мы поменяли на список сервисов, но все они работают в той нише, для которой и создавались.
Теперь Keyword Builder действительно отвечает всем требованиям рекламодателей и здравому смыслу. Аудитории в 2.5 млн людей создаются за 1-7 минут. Оценка размера аудитории происходит в реальном времени. Задействовано всего 8 серверов (i7-4770, 32GB RAM). Добавление серверов влечёт за собой линейный рост производительности.

А получившийся инструмент вы всегда можете попробовать в ретаргетинговой платформе Hybrid.

Выражаю горячую благодарность пользователю dcheklov, за помощь в создании статьи и наставлении на путь истинный =).
Тема DMP начата и остаётся открытой, ждите нас в следующих выпусках.

Комментарии (20)


  1. Stas911
    19.08.2015 16:34

    Интересная статья, спасибо!

    Вопросы:
    — поставщики данных как отдают такие потоки технически?
    — по схеме непонятно (и не обозначено), где там Spark используется — как я понял, это PageIndexer и VisitorActionReceiver?


    1. Stas911
      19.08.2015 16:45

      В догонку:
      — на чем писали? scala?
      — почему Solr, a не ElasticSearch какой-нить?


      1. dcheklov
        19.08.2015 17:28

        — Поставщики по-разному отдают: мы ставим коды (в этом случае в Kafka информация уходит с наших же серверов), присылают по протоколу zeromq, либо http протоколу. В общем маршрут до Kafka проходит в большинстве случаев еще через какие-нибудь сервисы.

        — Spark — это VisitorActionReciever. Да не понятно, надо подправить.

        — Spark на Java писали. Кстати, в статье есть кусок кода.

        — Solr. Прямо скажем, из-за Cloudera. На уровне индекса Solr и ElasticSearch — это все Lucene. Возможно когда-нибудь попробуем ES для этой задачи, но если упремся в производительность Solr. На текущий момент все устраивает.


        1. Stas911
          19.08.2015 18:49

          А не могли бы подробнее рассказать про интерфейс Solr-HBase? Там полностью автоматом данные переливаются в Solr этим Cloudera-вским коннектором или руками нужно писать?


          1. DanilaPerepechin
            19.08.2015 23:47

            Cloudera-Lily, автоматически реплицирует данные из HBase в Solr, необходимо лишь указать соответствие для типов и названий полей. Так же, при помощи этого сервиса можно делать запросы в Solr, а возвращать документы HBase. Руками писать не надо, но если очень хочется, то можно


  1. NetMozg
    20.08.2015 01:16

    … а на выходе получает аудиторию, состояющую из тех пользователей, которые посещали страницы с указанными ключевыми словами или искали эти слова в поиске....

    1) А как ключевые слова извлекаются из текстов страниц? Словосочетаниями по одному слову, по два, по три и т.д.?
    2) В поиске — в локальном поиске на сайте?


    1. dcheklov
      20.08.2015 08:43

      1. Текст вынимается целиком абзацами, фильтруется и сохраняется в Solr. Заботу по поиску он уже берет на себя. Solr поддерживает большое количество операторов поиска.
      2. В поиске Google, Yandex, Mail. А как же шифровка referer? Не знаю, посмотрите в GA, увидите там небольшой процент нешифрованного трафика, 3-8%. В рамках тех объемов, что мы получаем количество поисковых запросов достаточно много. В статье мы намеренно опустили, как мы его обрабатываем, т.к. задача это простая, только текст усложнили бы.


  1. ser0t0nin
    20.08.2015 15:15

    Спасибо за статью! Возникло несколько вопросов:
    1) Решаете ли вы как-нибудь проблемы, связанные с омонимией/синонимией ключевых слов?
    2) В интернете есть страницы, содержание которых меняется со временем. По описанию функционала PageIndexer сложилось впечатление, что вы не обновляете ключевые слова страницы. Если это не так, сколько «живет» скачанная копия страницы или как вы определяете, что ее пора обновить?
    3) Прошу короткий комментарий по поводу того, какие все-таки слова с веб-страницы считаются ключевыми? Есть ли какой-то статистический отбор токенов по их важности — ручной стоп-лист (если есть — насколько большой?) tf-idf или что-то еще?


    1. DanilaPerepechin
      20.08.2015 17:33

      1) Список синонимов можно задать в Solr. С омонимами никак не боремся.
      2) Главные страницы, обкачиваются несколько раз в день, из-за частой смены контента. В остальном, наши исследования показали, что только 3% страниц меняют свой контент более чем на 20% в течении недели(не учитываем комментарии). По истечении недели, страница удаляется по TTL индексу, и если она всё ещё живая и с неё происходят показы, мы просто снова скачиваем её, как новую страницу.
      3) Title, h1-h5, meta для всех страниц, если определяется что это статья или страница текстового содержания, то и его соответственно, если галереи то alt тег для картинок и т. д. То есть почти всё то есть на страницах, суть не столько в этих словах, сколько в весах, которые им задаются при составлении ключевого запроса (их мы раскрывать не будем).
      Стоп-слова есть, опять же посредствам Solr, не очень большой список стандартные стоп слова русский + английский плюс 100-200 мусорных слов.
      TF-IDF есть, но не в этой задаче, тут я что то и понять не могу, куда его прикрутить. Для увеличения веса страницы, пожалуй да, можно попробовать, но пока такое не внедрено.


      1. ser0t0nin
        20.08.2015 17:46

        Спасибо за ответ!
        Я предполагал, что например tf-idf мог бы использоваться например при вычислении веса, формулу которого вы не хотите раскрывать )) В общем, вопросов больше нет


    1. dcheklov
      20.08.2015 17:57

      TF-IDF и прочие плюшки больше характерны для задачи классификации страниц. Здесь больше задача про пользователя.

      Есть мысли попробовать следующее:
      — Индексировать пользователей по LDA-топикам (то есть не все ключевые слова, а только те, что влияют на определение темы страницы)
      — Сделать расширенный поиск. Рекламодатель вводит ключевые слова, а поиск осуществляется по тому, в какие топики входят эти слова.


      1. ser0t0nin
        20.08.2015 18:21

        А нет случайно задачи ранжировать пользователей? Например, когда нужны не все, которые читали про Ford Focus, а ТОП-100k по какому-то скорингу?


        1. dcheklov
          21.08.2015 09:19

          В первой версии нет такой задачи. Дальше будем пробовать, развивать, тестировать.


  1. firstrow
    22.08.2015 22:03

    А как вы определаяете пользователей из разных источников? Это cookies или несколько «схем»?


    1. dcheklov
      24.08.2015 10:34

      С каждым поставщиком настроен cookie matching. Либо сразу наш пиксель на сайтах стоит


  1. el777
    07.09.2015 16:48

    Не очень понятно как работает Segment Bulder.
    Он собирает все интересы пользователя (т.е. теги), находит каких-то похожих по тегам пользователей, затем записывает в Aerospike id пользователя и список рекламных кампаний, которые ему надо показать, так чтобы, когда пользователь зайдет, AS отдал ему уже готовый список кампаний, которые надо показать этому пользователю?
    Где можно подробнее про него почитать?


    1. DanilaPerepechin
      08.09.2015 09:34

      Да, вы всё верно написали, в aerospike хранятся записи вида {Visitor_id: [ audience_id ] }.
      Так называемые «теги» это ключевые слова, которые искал или видел пользователь с определённой частотой, а аудитория создаётся не просто по подобию пользователей (так называемая кластеризация), а по заданным условиям. Эти условия задаются во время создания рекламной кампании.
      Боюсь, что подробнее про Segment Bulder, кроме этой статьи, нигде не прочесть, я попробую ответить на ваши вопросы в комментариях


      1. el777
        08.09.2015 19:48

        Спасибо. Очень интересно. Буду ждать статью.
        Как понимаю, грубо это выглядит так: если пользователь искал велосипед, потом зашел на сайт про них, затем написал в велофоруме, то у него 3 срабатывания по тегу «велосипед», значит, он скорее всего заинтересован ими и его можно добавлять в соответствующую аудиторию.

        А кластеризацию пробовали использовать?
        Или она дает результат малопригодный для ваших целей? То есть кластеры-то выделит, но, что можно им продать не очень понятно? Пробовали анализировать результат?


        1. DanilaPerepechin
          09.09.2015 10:22

          Кластеризацию пробовали и даже используем, для уменьшения размерности вектора в предикте клика, для составления look-alike и прочих подобных задач. Но продать что то конкретное абстрактному кластеру, представляется мне малореализуемым.


          1. el777
            15.09.2015 12:21

            Насколько хорошо повышает удается предсказывать? Какие алгоритмы используете?
            У меня есть ощущение, что такие абстрактные кластеры будут состоят из пользователей, которым в целом тема интересна, но к покупке они еще не готовы — поэтому они смотрят очень широко и каждый интерес не является ярковыраженным на этом фоне. Возможно, это аудитория, которую можно «подогреть»?