Долгое время в iFunny использовался Redshift в качестве базы данных для событий, которые происходят в сервисах бэкенда и мобильных приложениях. Его выбрали потому, что на момент внедрения, по большому счёту, не было альтернатив, сравнимых по стоимости и удобству.

Однако всё изменилось после публичного релиза ClickHouse. Мы долго его изучали, сравнивали стоимость, прикидывали примерную архитектуру и вот, наконец, этим летом решились посмотреть, насколько он нам полезен. Из этой статьи вы узнаете о том, какую проблему нам помогал решить Redshift, и как мы перенесли это решение на ClickHouse.

Проблема


iFunny требовался сервис, похожий на Яндекс.Метрику, но исключительно для внутреннего потребления. Объясню почему.

Внешние клиенты пишут события. Это могут быть мобильные приложения, веб-сайты или внутренние сервисы бэкенда. Этим клиентам очень сложно объяснить, что сейчас сервис приёма событий недоступен, «попробуйте отправить минут через 15 или через час». Клиентов много, события они хотят отправлять постоянно и ждать совершенно не могут.

В противовес им есть внутренние сервисы и пользователи, которые достаточно толерантны в этом плане: они могут правильно работать даже с недоступным сервисом аналитики. А большинство продуктовых метрик и результатов A/B-тестов вообще имеет смысл смотреть только раз в сутки, а может, и того реже. Поэтому требования к чтению достаточно низкие. В случае аварии или обновления мы можем себе позволить быть недоступными или неконсистентными по чтению несколько часов или даже дней (в особо запущенном случае).

Если говорить о цифрах, то нам нужно принимать около пяти миллиардов событий (300 Гб сжатых данных) в сутки, при этом хранить данные за три месяца в «горячем», доступном для SQL-запросов виде, а в «холодном» — года за два или более, но чтобы в течение нескольких дней мы смогли их превратить в «горячие».

В основном данные представляют собой набор событий, упорядоченных по времени. Типов событий около трёх сотен, каждый имеет свой набор свойств. Ещё есть немного данных из сторонних источников, которые должны синхронизироваться с базой данных аналитики: например, коллекция инсталляций приложения из MongoDB или внешний сервис AppsFlyer.

Получается, что под базу данных нам требуется примерно 40 ТБ диска, а под «холодное» хранилище — ещё около 250 TБ.

Решение на Redshift




Итак, есть мобильные клиенты и сервисы бэкенда, от которых нужно принимать события. Данные принимает HTTP-сервис, проводит минимальную валидацию, собирает на локальном диске события в файлы, сгруппированные по одной минуте, сразу же сжимает и отправляет их в S3-бакет. Доступность этого сервиса зависит от доступности серверов с приложением и AWS S3. Приложения не хранят состояния, поэтому легко балансируются, масштабируются и взаимозаменяются. S3 относительно простой сервис хранения файлов, имеющий хорошую репутацию и доступность, поэтому на него можно положиться.

Дальше нужно как-то доставить данные в Redshift. Тут всё достаточно просто: Redshift имеет встроенный S3-импортер, который является рекомендуемым способом загрузки данных. Поэтому раз в 10 минут стартует скрипт, который подключается к Redshift и просит его загрузить данные по префиксу s3://events-bucket/main/year=2018/month=10/day=14/10_3*

Для того чтобы отслеживать состояние задачи загрузки, мы используем Apache Airflow: он позволяет повторять операцию в случае ошибок и иметь наглядную историю выполнения, что важно при большом количестве таких задач. А в случае проблем можно повторить загрузку за какие-то временные интервалы или загрузить «холодные» данные из S3 годичной давности.

В этом же Airflow точно так же, по расписанию, работают скрипты, которые подключаются к базе данных и выполняют периодические загрузки из внешних хранилищ, либо строят агрегации над событиями в виде INSERT INTO ... SELECT ...

Redshift имеет слабые гарантии доступности. Раз в неделю, на срок до получаса (в настройках указывается временное окно) AWS может остановить кластер на обновление или любые другие плановые работы. При аварии на одной ноде кластер также становится недоступен до восстановления хоста. Обычно это занимает около 15 минут и случается примерно раз в полгода. В текущей системе это не является проблемой, она изначально была рассчитана на то что база будет периодически недоступна.

Под Redshift использовалось 4 инстанса ds2.8xlarge (36 CPU, 16 TБ HDD), что в сумме даёт нам 64 ТБ дискового пространства.

Последний момент — это резервное копирование. Расписание резервных копий можно указать в настройках кластера, и работает оно прекрасно.

Мотивация перехода на ClickHouse


Конечно, если бы не было никаких проблем, никто бы и не подумал о миграции на ClickHouse. Однако, они были.

Если посмотреть на схему хранения ClickHouse с движком MergeTree и Redshift, то можно заметить, что их идеология очень похожа. Обе базы данных являются колоночными, прекрасно работают с большим числом столбцов и очень хорошо сжимают данные на диске (а в Redshift можно настроить типы компрессии для каждой отдельной колонки). Даже данные хранятся одинаково: они отсортированы по первичному ключу, что позволяет вычитывать только конкретные блоки и не держать отдельные индексы в памяти, а это важно при работе с большими объёмами данных.

Существенная разница, как всегда, в деталях.

Таблица на каждый день


Сортировка данных на диске и фактическое удаление в Redshift происходит в момент, когда вы выполняете:
VACUUM <tablename>
При этом процесс вакуума работает со всеми данными в этой таблице. Если в одной таблице хранить данные за все три месяца, этот процесс занимает неприличное количество времени, а выполнять его нужно хотя бы ежесуточно, потому как удаляются старые данные и добавляются новые. Приходилось строить отдельные таблицы на каждый день и объединять их через View, а это не только сложность в ротации и поддержке этой View, но и замедление запросов. При запросе, судя по explain, сканировались все таблицы. И хотя сканирование одной таблицы занимает меньше секунды, при их количестве в 90 штук получается, что любой запрос занимает как минимум минуту. Это не очень удобно.

Дубликаты


Следующая проблема — это дубликаты. Так или иначе, при передаче данных по сети есть два варианта: либо терять данные, либо получать дубликаты. Терять сообщения мы никак не могли, поэтому просто смирились с тем, что какой-то небольшой процент событий будет продублирован. Удалить дубликаты за сутки можно через создание новой таблицы, вставку в неё данных из старой, где с помощью оконной функции удалены строки с повторяющимися id, удалением старой таблицы и переименованием новой. Так как поверх дневных таблиц была view, нужно было про неё не забыть и удалить на время переименования таблиц. При этом нужно было следить ещё и за блокировками, иначе, в случае появления запроса, который блокировал view или одну из таблиц, этот процесс мог сильно затянуться.

Мониторинг и обслуживание


Ни один запрос в Redshift не занимает меньше пары секунд. Даже если вы просто хотите добавить пользователя или посмотреть список активных запросов, вам придётся подождать пару десятков секунд. Конечно, можно потерпеть, и для такого класса баз данных это приемлемо, но в итоге выливается в кучу потерянного времени.

Стоимость


По нашим расчётам, развернуть ClickHouse на AWS-инстансах с точно такими же ресурсами получается ровно в два раза дешевле. Конечно, так и должно быть, ведь используя Redshift, вы получаете готовую базу данных, к которой сразу же после нажатия пары кнопок в консоли AWS можно подключиться любым клиентом PostgreSQL, а всё остальное AWS сделает за вас. Однако стоит ли оно того? У нас уже есть инфраструктура, мы вроде бы умеем делать бэкапы, мониторинг и конфигурацию, и делаем это для кучи внутренних сервисов. Почему бы не заняться поддержкой СlickHouse?

Процесс перехода


Для начала подняли маленькую инсталляцию ClickHouse из одной машины, куда стали периодически, с помощью встроенных средств, загружать данные из S3. Таким образом, получили возможность проверить наши предположения по поводу скорости и возможностей ClickHouse.

После пары недель тестов на небольшой копии данных, стало понятно, что для замены Redshift на Clickhouse придётся решить несколько вопросов:

  • на каких типах инстансов и дисках разворачивать;
  • использовать ли репликацию?
  • как установить, сконфигурировать и запустить;
  • как делать мониторинг;
  • какая именно будет схема;
  • как доставлять данные из S3;
  • как переписать все запросы со стандартного SQL на нестандартный?

Типы инстансов и диски. В количестве процессоров, диска и памяти решили отталкиваться от текущей инсталляции Redshift. Было несколько вариантов, в том числе i3-инстансы с локальными NVMe-дисками, но остановиться решили на r5.4xlarge и хранилище в виде 8T ST1 EBS на каждый инстанс. По подсчётам, это должно было дать сравнимую с Redshift производительность за вдвое меньшую стоимость. При этом за счёт использования EBS-дисков мы получаем простые резервные копии и восстановление через снапшоты дисков, почти как в Redshift.

Репликация. Так как отталкивались от того, что уже сейчас есть в Redshift, решили не использовать репликацию. К тому же это не заставляет сразу изучать ещё и ZooKeeper, которого в инфраструктуре пока нет, но отлично, что теперь появилась возможность сделать репликацию по требованию.

Установка. Это самая простая часть. Достаточно небольшой роли Ansible, которая установит уже готовые RPM-пакеты и сделает одинаковую конфигурацию на каждом хосте.

Мониторинг. Для мониторинга всех сервисов используется Prometheus вместе с Telegraf и Grafana, поэтому просто поставили агентов Telegraf на хосты с ClickHouse, собрали в Grafana дашборд, где показывалась текущая загрузка серверов по процессору, памяти и дискам. Через плагин к Grafana вывели на этот дашборд текущие активные запросы по кластеру, состояние импорта из S3 и другие полезные вещи. Получилось даже лучше и информативнее (и существенно быстрее), чем тот дашборд, что давала консоль AWS.

Схема. Одной из наших самых главных ошибок в Redshift было вынести в отдельные колонки только основные поля событий, а поля, которые редко используются, сложить
в одну большую колонку properties. С одной стороны, это дало нам гибкость в изменениях полей на начальных этапах, когда не было полного понимания, какие именно события мы будем собирать, с какими свойствами, к тому же они менялись по 5 раз за день. А с другой стороны, запросы по большой колонке properties занимали всё больше и больше времени. В ClickHouse мы решили сразу сделать правильно, поэтому собрали все возможные колонки и вписали им оптимальный тип. Получилась таблица примерно с двумя сотнями колонок.

Следующей задачей было выбрать правильный движок для хранения и партицирования.
С партицированием снова не стали думать, а сделали так же, как и было в Redshift, — по партиции на каждый день, но теперь все партиции являются одной таблицей, что
существенно ускоряет запросы и упрощает обслуживание. Движок для хранения взяли ReplacingMergeTree, так как он позволяет удалять дубликаты из определённой партиции, просто выполнив OPTIMIZE… FINAL. Кроме того, посуточная схема партицирования позволяет в случае ошибок или аварий, работать только с данными за сутки, а не месяц, что существенно быстрее.

Доставка данных из s3 в ClickHouse. Это был один из самых долгих процессов. Просто сделать загрузку встроенными средствами ClickHouse не получилось, потому что данные на S3 лежат в JSON, каждое поле нужно достать по своему jsonpath, как мы это делали в Redshift, а иногда ещё и применять трансформацию: например, UUID сообщения из стандартной записи в виде DD96C92F-3F4D-44C6-BCD3-E25EB26389E9 сконвертировать в байты и положить в тип FixedString(16).

Хотелось иметь специальный сервис, аналогичный тому, что мы имели в Redshift в виде команды COPY. Ничего готового не нашли, поэтому пришлось его сделать. О том, как он работает, можно написать отдельную статью, но если вкратце, то это HTTP-сервис, который развёрнут на каждом хосте с ClickHouse. Обращаться можно к любому из них. В параметрах запроса указывается префикс S3, из которого забираются файлы, список jsonpath для преобразования из JSON в набор колонок, а также набор преобразований для каждой колонки. Сервер, к которому пришёл запрос, начинает сканировать файлы на S3 и раздавать работу по парсингу остальным хостам. При этом нам важно, чтобы строки, которые не удалось импортировать, вместе с ошибкой складывались в отдельную таблицу СlickHouse. Это очень помогает расследовать проблемы и баги в сервисе приёма событий и клиентах, которые эти события генерируют. С размещением импортёра прямо на хостах базы данных, мы утилизировали те ресурсы, которые, как правило, простаивают, потому что сложные запросы к ним идут не круглосуточно. Конечно, если запросов станет больше, можно всегда вынести сервис импортёра на отдельные хосты.

С импортом данных из внешних источников больших проблем не возникло. В тех скриптах, которые были, просто изменили назначение с Redshift на ClickHouse.

Был вариант подключить MongoDB в виде словаря, а не делать ежесуточные копирования. К сожалению, он не подошёл, потому, что словарь обязательно должен помещаться в память, а размеры большинства коллекций в MongoDB этого не позволяют. Но словари тоже нам пригодились: с их помощью очень удобно подключать GeoIP базы данных от MaxMind и использовать в запросах. Для этого используем layout ip_trie и CSV файлы, которые предоставляются сервисом. Например, конфигурация словаря geoip_asn_blocks_ipv4 выглядит так:


<dictionaries>
   <dictionary>
      <name>geoip_asn_blocks_ipv4</name>
      <source>
         <file>
            <path>GeoLite2-ASN-Blocks-IPv4.csv</path>
            <format>CSVWithNames</format>
         </file>
      <\/source>
      <lifetime>300</lifetime>
      <layout>
         <ip_trie />
      </layout>
      <structure>
         <key>
            <attribute>
               <name>prefix</name>
               <type>String</type>
            </attribute>
         </key>
         <attribute>
            <name>autonomous_system_number</name>
            <type>UInt32</type>
            <null_value>0</null_value>
         </attribute>
         <attribute>
            <name>autonomous_system_organization</name>
            <type>String</type>
            <null_value>?</null_value>
         </attribute>
      </structure>
   </dictionary>
</dictionaries>

Этот конфиг достаточно положить в /etc/clickhouse-server/geoip_asn_blocks_ipv4_dictionary.xml, после чего можно делать запросы к словарю, чтобы получить название провайдера по IP адресу:

SELECT dictGetString('geoip_asn_blocks_ipv4', 'autonomous_system_organization', tuple(IPv4StringToNum('192.168.1.1')));

Изменение схемы данных. Как упоминалось выше, решили пока не использовать репликацию, так как сейчас можем себе позволить стать недоступными при авариях или запланированных работах, а копия данных уже лежит на s3 и мы можем перенести ее в ClickHouse за разумные сроки. Если нет репликации, то и ZooKeeper разворачивать не стали, а отсутствие ZooKeeper ведет и к невозможности использовать выражение ON CLUSTER в DDL запросах. Эта проблема решилась небольшим скриптом на python, который подключается к каждому хосту ClickHouse (пока их всего восемь) и выполняет указанный SQL-запрос.

Не полная поддержка SQL в ClickHouse. Процесс перекладывания запросов из синтаксиса Redshift в синтаксис ClickHouse шёл параллельно с разработкой импортёра, и занималась им в основном команда аналитиков. Как это ни странно, но дело оказалось даже не в JOIN, а в оконных функциях. Чтобы понять, как их можно сделать через массивы и лямбда-функции, ушло несколько дней. Хорошо, что этот вопрос достаточно часто освещается в докладах про ClickHouse, которых огромное количество, например, events.yandex.ru/lib/talks/5420. В этот момент данные уже писались сразу в два места: и в Redshift, и в новый ClickHouse, поэтому при переносе запросов мы сравнивали результаты. Сравнить скорость было проблематично, так как мы убрали одну большую колонку properties, и большинство запросов стало работать только с нужными колонками, что, естественно, дало существенный прирост, но те запросы, где колонка properties не участвовала, работали так же, либо чуть быстрее.

В итоге получилась такая схема:



Результаты


В сухом остатке мы получили следующие преимущества:

  • Одна таблица вместо 90
  • Служебные запросы исполняются за миллисекунды
  • Стоимость уменьшилась в два раза
  • Простое удаление дубликатов событий

Есть и недостатки, к которым мы готовы:

  • В случае аварии, чинить кластер придётся самостоятельно
  • Изменения схемы теперь нужно производить на каждом хосте отдельно
  • Обновляться на новые версии придётся своими силами

Сравнивать скорость запросов в лоб мы не можем, так как существенно изменилась схема данных. Многие запросы стали быстрее, просто из-за того, что читают меньший объём данных с диска. По-хорошему, такое изменение нужно было сделать ещё в Redshift, но было решено совместить с миграцией на ClickHouse.

Вся миграция вместе с подготовкой заняла около трёх месяцев. Шла она с начала июля по конец сентября и потребовала участия двух человек. 27-го сентября, мы выключили Redshift и с тех пор работаем только на ClickHouse. Получается, уже чуть более двух месяцев. Срок небольшой, но пока ни разу не натолкнулись на потерю данных или критичный баг, из-за которого весь кластер бы встал. Впереди нас ждут обновления на новые версии!

Комментарии (10)


  1. kkirsanov2
    19.12.2018 12:31

    Думаю тут будет много читателей знакомых с CH так что задам вопрос:

    Есть одна микросервисная архитектура. Пара десятков микросервисов шлют друг-другу сообщения через кафку. В день набегает 10-20 млн сообщений. Для мониторинга этого вся важная информация из сообщений как есть (т.е. прямо транзакционка) перекладывается в InfluxDB (бесплатная) и заводится в графину, где уже выборки-фильтрации настраиваются.

    Есть в CH то, ради чего в этой схеме следует InfluxDB заменить на CH?


    1. alexzaitsev
      19.12.2018 16:33

      Думать о замене надо лишь в том случае, если что-то не устраивает в текущем решении. Авторы статьи описали свои проблемы с RedShift, ради которых они затеяли переезд на ClickHouse. А какая мотивация у вас?


      1. kkirsanov2
        19.12.2018 17:38

        >Думать о замене надо лишь в том случае, если что-то не устраивает в текущем решении.
        Уязвимая позиция.
        Как я узнаю что меня что-то не устраивает, если не узнавать новое о других системах?

        Я так, например пол жизни думал что раз левая рука слабее правой, то и нормально что левое ухо слышит хуже правого.

        > А какая мотивация у вас?
        Репликация, контроль схемы, более живое сообщество.


  1. Lachezis
    19.12.2018 13:34

    Вы пробовали использовать встроенные инструменты AWS для сбора событий типа Kinesis Data Stream и Kinesis Firehose?


    Интересно сравнение по стоимости и масштабированию с собственным решением.


    Спасибо за статью!


    1. kubus Автор
      19.12.2018 13:43

      Да, смотрели на Kinesis, но решили что s3 и локальные файлы гораздо проще, а чем проще тем лучше.


  1. amarao
    19.12.2018 15:40

    … Если кому-то интересно, краткий отчёт о внедрении ClickHouse с точки зрения оператора.

    На входе: примерно 5 мегабит/с данных, непрерывным потоком, батчами раз в 2-3 минуты с нескольких источников. Размер базы — 20Гб, авторотация на неделю. Фиксированный набор аналитики по данным (посчитать, найти топ, etc).

    Внедрили. 153 дня аптайма, 0 проблем за это время. Я внедрил и забыл, программисты довольны. Мониторинг — место на сервере, память на сервере, живость clickhouse'а.

    Отчёт мониторинга за 6 месяцев:

    OK: 99.997%
    Scheduled: 0.003%
    Unscheduled: 0.000%


  1. crazyproger
    19.12.2018 18:51

    Подскажите пожалуйста, какой Grafana плагин использовали для вывода текущих активных запросов?


    1. kubus Автор
      19.12.2018 18:52

      А он вроде бы один всего, вот этот grafana.com/plugins/vertamedia-clickhouse-datasource


  1. Pavel_L
    20.12.2018 02:54

    Вопрос: рассматривали ли вариант миграции на SnowFlake вместо ClickHouse, и если да — какие аргументы «против» перевесили?


    1. kubus Автор
      20.12.2018 10:57

      Нет, не рассматривали