В предыдущей статье мы рассказали про перенос разработки в единый репозиторий с trunk-based подходом к разработке, с едиными системами сборки, тестирования, деплоя и мониторинга, про то, какие задачи должна решать система непрерывной интеграции для эффективной работы в таких условиях.
Сегодня мы расскажем читателям Хабра про устройство системы непрерывной интеграции.
Система непрерывной интеграции должна работать надежно и быстро. Система должна быстро реагировать на поступающие события и не должна вносить дополнительных задержек в процесс доставки результатов запуска тестов до пользователя. Результаты сборки и тестирования нужно доставлять до пользователя в режиме реального времени.
Система непрерывной интеграции представляет из себя систему потоковой обработки данных с минимальными задержками.
После отправки всех результатов по определенной стадии (configure, build, style, small tests, medium tests и т.д.) система сборки сигнализирует об этом в систему непрерывной интеграции ("закрывает" стадию), и пользователь видит, что для данной проверки и данной стадии известны все результаты. Каждая стадия закрывается независимо. Пользователь быстрее получает полезный сигнал. После закрытия всех стадий проверка считается завершенной.
Для реализации системы мы выбрали Kappa архитектуру. Система состоит из 2-х подсистем:
- Обработка событий и данных происходит в realtime контуре. Любые входные данные обрабатываются как потоки данных (стримы). Сначала события записывается в стрим и только затем осуществляется их обработка.
- Результаты обработки данных непрерывно записываются в базу данных, куда затем идут обращения через API. В Kappa архитектуре это называется serving layer.
Все запросы на модификацию данных должны проходить через realtime контур, поскольку там всегда нужно иметь актуальное состояние системы. Запросы на чтение идут только в базу данных.
Везде, где можно, мы следуем правилу "append-only". Никаких модификаций и удалений объектов, за исключением удаления старых, ненужных данных.
За день через сервис проходит более 2 Тб сырых данных.
Преимущества:
- Стримы содержат все события и сообщения. Мы всегда можем понять что и когда произошло. Стрим можно воспринимать как большой лог.
- Высокая эффективность и минимальные накладные расходы. Получается полностью событийно-ориентированная система, без каких-либо потерь на polling'е. Нет события — ничего лишнего не делаем.
- Прикладной код практически не имеет дела с примитивами синхронизации потоков и памятью, разделяемой между потоками. Это делает систему более надежной.
- Процессоры хорошо изолированы друг от друга, т.к. не взаимодействуют напрямую, только через стримы. Можно обеспечить хорошее тестовое покрытие.
Но потоковая обработка данных не так проста:
- Требуется хорошее понимание вычислительной модели. Вам придётся переосмыслить существующие алгоритмы обработки данных. Не все алгоритмы с ходу эффективно ложатся на модель стримов и придётся немного поломать голову.
- Необходимо гарантировать сохранение порядка поступления и обработки событий.
- Нужно уметь обрабатывать взаимосвязанные события, т.е. иметь быстрый доступ ко всем необходимым данным во время обработки нового сообщения.
- Также нужно уметь обрабатывать дубликаты событий.
Потоковая обработка данных (stream processing)
Во время работы над проектом была написана библиотека Stream Processor, которая помогла нам в короткие сроки реализовать и запустить в production потоковые алгоритмы обработки данных.
Stream Processor — это библиотека для построения систем потоковой обработки данных. Стрим — потенциально бесконечная последовательность данных (сообщений), в которую возможно лишь добавление новых сообщений, уже записанные сообщения не изменяются и не удаляются из стрима. Преобразователи одних стримов в другие (процессоры стримов) функционально состоят из трёх частей: поставщика входящих сообщений, который обычно вычитывает сообщения из одного или нескольких стримов и складывает их в очередь на обработку, процессора сообщений, который преобразует входящие сообщения в исходящие и складывает их в очередь на запись, и писателя, где сгруппированные в рамках временного окна исходящие сообщения попадают в выходные стримы. Данные сообщений, сгенерированных одним процессором стримов, могут в дальнейшем быть использованы другими. Таким образом, стримы и процессоры образуют ориентированный граф, в котором возможно наличие циклов, в частности процессор стрима может даже генерировать сообщения в тот же стрим, откуда он берёт данные.
Гарантируется, что каждое сообщение входного стрима будет обработано каждым связанным с ним процессором хотя бы один раз (семантика at least once). При этом также гарантируется, что все сообщения будут обработаны в том порядке, в котором они поступали в этот стрим. Для этого процессоры стримов распределяются по всем работающим узлам сервиса, так чтобы в один момент времени работало не более одного экземпляра каждого из зарегистрированных процессоров.
Обработка взаимосвязанных событий — одна из основных проблем при построении систем потоковой обработки данных. Как правило, при потоковой обработке сообщений процессоры стримов инкрементально создают некое состояние, валидное на момент обработки текущего сообщения. Такие объекты-состояния обычно связаны не со всем стримом в целом, а с неким его подмножеством сообщений, которое определяется значением ключа в этом стриме. Эффективное хранение состояния — ключ к успеху. При обработке очередного сообщения процессору важно уметь быстро получить это состояние и на основании него и текущего сообщения сгенерировать исходящие сообщения. Эти объекты-состояния доступны процессорам в L1 (просьба не путать с кэшем CPU) LRU-кэше, который размещается в памяти. В случае, если в L1-кэше состояния не оказалось, оно восстанавливается из L2-кэша, размещенного в том же хранилище, где хранятся стримы, и куда оно периодически сохраняется во время работы процессора. Если же и в L2-кэше состояния не нашлось, то оно восстанавливается из исходных сообщений стрима, как если бы процессор обработал все исходные сообщения, связанные ключом текущего сообщения. Техника кэширования также позволяет бороться с проблемой высокой латентности хранилища, так как зачастую последовательный процессинг упирается не в производительность сервера, а в задержки запросов и ответов при общении с хранилищем данных.
Чтобы эффективно хранить в памяти данные в L1-кэшах и данные сообщений, помимо memory-efficient структур, мы используем пулы объектов, которые позволяют иметь в памяти только одну копию объекта (или даже его части). Такая техника уже применяется в JDK для строк string interning и похожим образом распространяется на другие типы объектов, которые при этом должны быть иммутабельными.
Для компактного хранения данных в хранилище стримов некоторые данные нормализуются перед записью в стрим, т.е. превращаются в числа. К числам (идентификаторы объектов) далее можно применить эффективные алгоритмы сжатия. Числа сортируются, считаются дельты, далее кодирование с помощью ZigZag Encoding и затем сжатие архиватором. Нормализация является не совсем стандартной техникой для систем потоковой обработки данных. Но данная техника сжатия очень эффективна и объём данных в самом нагруженном стриме сокращается примерно в 1?000 раз.
Для каждого стрима и процессора мы отслеживаем жизненный цикл обработки сообщений: появление новых сообщений во входном стриме, размер очереди необработанных сообщений, размер очереди на запись в результирующий стрим, время обработки сообщений и распределение времени по стадиям обработки сообщений:
Хранилище данных
Результаты потоковой обработки данных должны быть доступны пользователю как можно скорее. Обработанные данные из стримов должны непрерывно записываться в базу данных, куда затем можно обратиться за данными (например, показать отчет с результатами проверки, показать историю теста).
Характеристики хранимых данных и запросов.
Большая часть данных — запуски тестов. За месяц происходит более 1.5 миллиарда запусков сборок и тестов. По каждому запуску хранится довольно большое количество информации: результат и тип ошибки, краткое описание ошибки (сниппет), несколько ссылок на логи, продолжительность теста, набор числовых значений, метрик, в формате имя=значение и т.п. Часть этих данных — например, метрики и продолжительность — очень плохо поддаётся сжатию, так как фактически представляет собой случайные значения. Другая же часть — например, результат, тип ошибки, логи — могут быть сохранены более эффективно, поскольку почти не меняются у одного и того же теста от запуска к запуску.
Ранее для хранения обработанных данных мы использовали MySQL. Мы постепенно начинали упираться в возможности базы данных:
- Объем обрабатываемых данных удваивается каждые полгода.
- Мы могли хранить данные только за 2 последних месяца, а хотели хранить данные минимум за год.
- Проблемы со скоростью выполнения некоторых тяжелых (близким к аналитическим) запросов.
- Усложненная схема базы данных. Много таблиц (нормализация), что усложняет запись в базу данных. Схема базы сильно отличается от схемы объектов, используемых в realtime контуре.
- Не переживает отключение сервера. Поломка отдельного сервера или отключение дата-центра может привести к поломке системы.
- Достаточно сложная эксплуатация.
В качестве кандидатов для нового хранилища данных мы рассматривали несколько вариантов: PostgreSQL, MongoDB и несколько внутренних решений, включая ClickHouse.
Некоторые решения не позволяют хранить наши данные эффективнее, чем старое решение на базе MySQL. Другие не позволяют реализовать быстрые и сложные (почти аналитические) запросы. Например, у нас есть достаточно тяжелый запрос, который показывает коммиты, влияющие на определенный проект (некоторый набор тестов). Во всех случаях, где мы не можем выполнять быстрые SQL запросы, нам пришлось бы заставлять пользователя долго ждать или делать некоторые вычисления заранее с потерей гибкости. Если считать что-то заранее, то для этого нужно писать больше кода и при этом теряется гибкость – нет возможности быстро изменить поведение и что-нибудь пересчитать. Намного удобнее и быстрее написать SQL запрос, который вернет нужные пользователю данные и иметь возможность быстро его модифицировать, если захочется изменить поведение системы.
ClickHouse
Мы остановили свой выбор на ClickHouse. ClickHouse – столбцовая система управления базами данных (СУБД) для онлайн обработки аналитических запросов (OLAP).
Переходя на ClickHouse, мы сознательно отказались от некоторых возможностей, предоставляемых другими СУБД, получив за это более чем достойную компенсацию в виде очень быстрых аналитических запросов и компактного хранилища данных.
В реляционных СУБД значения, относящиеся к одной строке, физически хранятся рядом. В ClickHouse значения из разных столбцов хранятся отдельно, а данные одного столбца — вместе. Такой порядок хранения данных позволяет обеспечить высокую степень сжатия данных при правильном выборе первичного ключа. Это также влияет на то в каких сценариях СУБД будет хорошо работать. ClickHouse лучше работает с запросами, где читается небольшое количество столбцов и в запросе используется одна большая таблица, а остальные таблицы маленькие. Но даже в неаналитических запросах ClickHouse может показывать хорошие результаты.
Данные в таблицах отсортированы по первичному ключу. Сортировка выполняется в фоновом режиме. Это позволяет создавать разреженный индекс небольшого объёма, который позволяет быстрее находить данные. В ClickHouse нет вторичных индексов. Строго говоря, есть один вторичный индекс — ключ партиционирования (ClickHouse отсекает данные по партициям там, где ключ партиционирования указан в запросе). Подробнее.
Нефункциональна схема данных с нормализацией, наоборот, данные предпочтительно денормализовывать в зависимости от запросов к ним. Предпочтительнее создавать "широкие" таблицы с большим количеством столбцов. Этот пункт также связан с предыдущим, потому что отстутствие вторичных индексов заставляет иногда создавать копии таблиц, использующих другой первичный ключ.
В ClickHouse нет UPDATE и DELETE в классическом смысле, однако есть возможность их эмуляции.
Данные необходимо вставлять большими блоками и не слишком часто (раз в несколько секунд). Построчная загрузка данных фактически неработоспособна на реальных объёмах данных.
ClickHouse не поддерживает транзакции, система становится eventually consistent.
Тем не менее некоторые возможности ClickHouse, сходные с другими СУБД, позволяют облегчить перевод существующих систем на неё.
- ClickHouse использует SQL, но с небольшими отличиями, полезными для запросов, типичных в OLAP-системах. Имеется мощная система агрегатных функций, ALL/ANY JOIN, lambda-выражения в функциях и другие расширения SQL, позволяющие написать практически любой аналитический запрос.
- ClickHouse поддерживает репликацию, кворумную запись, кворумное чтение. Кворумная запись необходима для надежного сохранения данных: INSERT завершается успешно только в том случае, когда ClickHouse смог без ошибки записать данные на заданное число реплик.
Подробнее про особенности ClickHouse можно почитать в документации.
Особенности работы с ClickHouse
Выбор первичного ключа и ключа партиционирования.
Как выбрать первичный ключ и ключ партиционирования? Пожалуй, это первый вопрос, который возникает при создании новой таблицы. Выбор первичного ключа и ключа партиционирования как правило диктуется запросами, которые будут выполняться над данными. При этом наиболее эффективными оказываются запросы, которые используют оба условия: и по первичному ключу, и по ключу партиционирования.
В нашем случае главные таблицы — матрицы запусков тестов. Логично предположить, что при такой структуре данных ключи надо выбрать так, чтобы порядок обхода одного из них шел в порядке увеличения номера строки, а порядок обхода другого — в порядке увеличения номера столбца.
Важно также иметь ввиду, что выбор первичного ключа может драматически влиять на компактность хранения данных, так как идущие в порядке обхода первичного ключа одинаковые значения в других столбцах почти не занимают места в таблице. Так в нашем случае, например, состояния тестов мало меняются от коммита от коммиту. Этот факт по сути предопределил выбор первичного ключа — пары идентификатор теста и номер коммита. Причем именно в таком порядке.
У ключа партиционирования два назначения. С одной стороны, он позволяет партициям становиться «архивными», чтобы их можно было удалить из хранилища насовсем, поскольку данные в них уже устарели. С другой стороны, ключ партиционирования является вторичным индексом, а значит, позволяет ускорить запросы, если выражение по нему присутствует в них.
Для наших матриц выбор номера коммита в качестве ключа партиционирования кажется вполне естественным. Но если задать в выражении для ключа партиционирования значение ревизии, то партиций в такой таблице будет неоправданно много, что приведет к деградации производительности запросов к ней. Поэтому в выражении для ключа партиционирования значение ревизии можно разделить на какое-то большое число, чтобы снизить число партиций, например, PARTITION BY intDiv(revision, 2000). Это число должно быть достаточно велико, чтобы число партиций не превысило рекомендуемые значения, при этом оно должно быть достаточно мало, чтобы в одну партицию попало не очень много данных и БД не пришлось бы вычитывать слишком много данных.
Как реализовать UPDATE и DELETE?
В обычном понимании UPDATE и DELETE в ClickHouse не поддерживаются. Однако вместо UPDATE и DELETE можно добавить в таблицу колонку с версией и воспользоваться специальным движком ReplacingMergeTree (выполняет удаление дублирующихся записей с одинаковым значением первичного ключа). В некоторых случаях версия будет естественным образом присутствовать в таблице с самого начала: например, если мы захотим завести таблицу для текущего состояния теста, версией в этой таблице будет номер коммита.
CREATE TABLE current_tests (
test_id UInt64,
value Nullable(String),
version UInt64
) ENGINE = ReplacingMergeTree(version) ORDER BY test_id
В случае изменения записи добавляем версию с новым значением, в случае удаления — со значением NULL (или каким-либо другим специальным значением, которое не может встретиться в данных).
Чего получилось добиться с новым хранилищем?
Одной из главных целей перехода на ClickHouse была возможность хранить историю тестов за длительный промежуток времени (несколько лет или хотя бы год в худшем случае). Уже на этапе прототипа стало ясно, что мы сможем обойтись существующими в наших серверах SSD для хранения как минимум трёхлетней истории. Значительно ускорились аналитические запросы, теперь мы можем извлечь гораздо больше полезной информации из наших данных. Повысился запас прочности по RPS. Причем эта величина практически линейно масштабируется добавлением новых серверов в кластер ClickHouse. Создание нового хранилища данных базе ClickHouse — это всего лишь едва заметный конечному пользователю шаг на пути к более важной цели — добавлению новых возможностей, ускорению и упрощению разработки, благодаря способность хранить и обрабатывать большие объёмы данных.
Приходите к нам
Наш отдел постоянно расширяется. Приходите к нам, если хотите работать над сложными и интересными задачами и алгоритмами. Если есть вопросы, можно задавать прямо мне в личку.
Полезные ссылки
Stream Processing
- The Log: What every software engineer should know about real-time data's unifying abstraction.
- The world beyond batch: Streaming 101.
- Книга Designing Data-Intensive Applications — O'Reilly Media.
Kappa architecture
ClickHouse:
Комментарии (3)
gecube
29.11.2018 09:18Stream Processor — а почему не kafka? Я просто обращаю внимание, что Яндекс для всего строит свой «велосипед». Не удивлюсь, если внутри Я еще и не git/svn, а свой протокол и своя система для хранения репозиториев. Я понимаю, что это с одной стороны позволяет не зависеть от внешних компонентов. Но это такое себе. Все равно ведь приходится наверняка работать со стандартными ОСями и пакетами. А с другой стороны — каковы затраты на поддержание этих своих решений!?!?!?!?
alhel
Планируете ли вы выложить Stream Processor на git?
amkruglov Автор
Сейчас Stream Processor тесно завязан на наши внутренние компоненты. Если они поедут в open source, то мы тоже сможем.