Netflix продолжает расширять бизнес и диверсифицироваться в различных направлениях, вроде доставки видео по запросу и гейминга. В результате всё важнее становятся технологии, обеспечивающие загрузку временных (темпоральных) данных в системы компании и их хранение. Речь идёт об огромных объёмах данных, измеряемых петабайтами. А задержки доступа к этим данным должны укладываться в миллисекунду. В предыдущих материалах мы рассказывали о нашем слое абстракции для хранения данных типа «ключ-значение», и о платформе, реализующий возможности шлюза данных. И то и другое — это неотъемлемые части подсистемы, отвечающей в Netflix за работу с данными. Система хранения данных типа «ключ-значение» — это гибкое и хорошо масштабируемое решение для работы со структурированными данными соответствующего формата. А шлюз данных — это платформа, которая даёт компании базовую инфраструктуру, обеспечивающую защиту, настройку, развёртывание компонентов, ориентированных на работу с данными.
Опираясь на эти базовые абстракции, мы разработали слой абстракции для хранения данных, представленных временными рядами (TimeSeries Abstraction). Это — универсальное и хорошо масштабируемое решение, спроектированное в расчёте на эффективную работу с большими объёмами временных данных, описывающих жизненные циклы различных событий. Задержки доступа к данным в этой системе крайне малы и измеряются миллисекундами. Она отличается экономической эффективностью и может использоваться в самых разных сценариях, предполагающих работу с временными рядами.
В этом материале мы поговорим об архитектуре, о принципах проектирования и о реальных способах применения TimeSeries Abstraction. Мы продемонстрируем то, как эта система расширила возможности нашей платформы по управлению темпоральными данными в масштабах Netflix.
Обратите внимание на то, что система TimeSeries Abstraction, несмотря на её название, не проектировалась как темпоральная база данных общего назначения. Мы не используем её для хранения данных, связанных с метриками, какими-то гистограммами, таймерами. То же относится и к другим аналитическим данным, обрабатываемым почти в режиме реального времени. Работу с подобными данными прекрасно поддерживает наша система телеметрии Netflix Atlas. В случае с TimeSeries мы ориентированы на решение задач хранения и извлечения из хранилища неизменяемых темпоральных данных, описывающих жизненные циклы событий. Эти задачи нам надо решать, учитывая необходимость в чрезвычайно высокой пропускной способности системы, в низких задержках доступа к данным и в экономической эффективности системы.
Важнейшие задачи, встающие перед Netflix при работе с темпоральными данными
В Netflix постоянно генерируются и используются темпоральные данные. Они могут иметь отношение к взаимодействиям с пользователями, например — генерироваться при возникновении событий, связанных с проигрыванием видео. Они описывают то, что просматривают пользователи, когда не проигрывают видео. Они применяются для описания сложных сетевых процессов, относящихся к микросервисам. Мы заинтересованы в эффективной работе с этими данными в масштабах компании. Эта работа направлена на извлечение из них различных ценных сведений, которые чрезвычайно важны для обеспечения оптимального уровня удовлетворённости пользователей и надёжности системы.
Когда речь идёт о хранении подобных данных и о выполнении запросов к ним, появляется уникальный набор сложных и интересных задач, которые нам приходится решать. Вот что нас, в частности, интересует:
Высокая пропускная способность системы. Выполнение до 10 миллионов операций записи в секунду с поддержкой высокого уровня доступности данных.
Эффективное выполнение запросов к большим наборам данных. Хранение петабайтов данных и обеспечение, при чтении данных по первичному ключу, низких задержек, выражающихся, в миллисекундах, небольшими двузначными значениями. Поддержка операций поиска и агрегации данных по множеству вторичных атрибутов.
Поддержка операций чтения и записи данных в глобальном масштабе. Обеспечение проведения операций чтения и записи, инициированных из любого места в мире. Наличие настраиваемых моделей согласованности данных.
Настраиваемая конфигурация наборов данных. Наличие возможности разбиения наборов данных на разделы, проводимого либо в одноарендном, либо в многоарендном хранилище данных. Здесь же нас интересуют возможности по настройке различных параметров наборов данных — таких, как согласованность данных и время их хранения.
Возможность обработки трафика в условиях пиковых нагрузок. Устойчивость к значительным пикам трафика, которые появляются во время событий, вызывающих повышенный интерес у пользователей, или во время сбоев, затрагивающих целые регионы присутствия Netflix.
Экономическая эффективность. Снижение стоимости использования системы, определяемой в расчёте на один байт обрабатываемой информации и в расчёте на выполнение одной операции. Это нужно для оптимизации долговременного хранения данных при минимизации расходов на поддержку инфраструктуры. В случае с Netflix подобные расходы исчисляются миллионами долларов.
Абстракция для хранения временных рядов
Сервис TimeSeries Abstraction был разработан так, чтобы соответствовать определённым требованиям. В его основе лежат следующие основные архитектурные принципы:
Разбиение наборов данных на разделы. Данные разбивают на разделы, используя уникальную стратегию секционирования темпоральных данных, скомбинированную с подходом, предусматривающим распределение событий по контейнерам. Это нужно для обеспечения эффективной работы системы в условиях пиковых нагрузок, а так же — для упрощения и ускорения выполнения запросов.
Поддержка гибкого подхода к хранению данных. Сервис спроектирован так, чтобы он мог бы интегрироваться с различными базовыми хранилищами данных, в состав которых входят Apache Cassandra и Elasticsearch. Это позволяет Netflix подбирать решение для хранения данных, опираясь на требования конкретного сценария использования хранилища.
Конфигурируемость сервиса. Абстракция TimeSeries поддерживает целый ряд настраиваемых параметров, имеющих отношение к каждому из наборов данных. Это даёт системе гибкость, необходимую для обеспечения потребностей широкого разнообразия вариантов её использования.
Масштабируемость сервиса. Наша архитектура поддерживает и горизонтальное, и вертикальное масштабирование. Это позволяет системе обрабатывать постоянно возрастающие объёмы данных и обеспечивать всё большую и большую пропускную способность по мере того, как Netflix расширяет свою пользовательскую базу и набор используемых компанией сервисов.
Общая инфраструктура. Использование платформы, реализующей возможности шлюза данных (Data Gateway Platform), позволяет нам разворачивать одноарендные или многоарендные инфраструктурные решения, обеспечивая необходимые для них параметры, касающиеся ограничения доступа к данным и изоляции трафика.
Поговорим о различных аспектах сервиса TimeSeries Abstraction.
Модель данных
Мы применяем уникальную модель данных о событиях. Она включает в себя все сведения о событиях, которые нас интересуют, позволяя выполнять эффективные запросы к этим сведениям.
Начнём с наименьшего элемента данных, используемого в нашей абстракции, и, разобравшись с ним, пойдём дальше.
Элемент данных события (Event Item). Элемент данных события — это данные типа «ключ-значение», которые пользователь применяет для того, чтобы сохранить данные о конкретном событии. Например:
{“device_type”: “ios”}
.Событие (Event). Событие — это структурированная коллекция, включающая в себя один элемент данных события или большее их количество. Событие происходит в определённый момент времени. Оно идентифицируется временной меткой, генерируемой клиентом (
event_time
), а так же — идентификатором события (event_id
), в роли которого может выступать UUID. Эта комбинация изevent_time
иevent_id
, кроме того, формирует часть уникального ключа идемпотентности события, позволяя пользователям безопасно повторять запросы.Идентификатор временного ряда (Time Series ID). Сущность
time_series_id
— это коллекция, состоящая из одного или нескольких событий, существующая в рамках времени хранения набора данных. Например, идентификатор временного рядаdevice_id
может применяться для хранения всех событий, имеющих отношение к конкретному устройству, произошедших в период хранения данных. Все события являются иммутабельными, а сервис TimeSeries, выполняя операции записи, лишь добавляет новые события к коллекциям, идентифицируемымtime_series_id
.Пространство имён (Namespace). Пространство имён — это коллекция идентификаторов временных рядов и данных, описывающих события, представляющая собой полный набор данных TimeSeries. Пользователи могут создавать необходимое количество пространств имён для каждой из своих задач. Наша абстракция позволяет задавать различные конфигурируемые параметры на уровне пространства имён. Мы поговорим об этом позже, когда будем обсуждать плоскость управления сервиса.
API
Абстракция TimeSeries предоставляет следующие API, позволяющие взаимодействовать с данными.
WriteEventRecordsSync: эта конечная точка записывает переданный ей пакет событий и отправляет в ответ клиенту подтверждение получения и сохранения данных. Это подтверждение используется в тех случаях, когда пользователю нужна гарантия устойчивости данных, отправленных им в систему.
WriteEventRecords: это — версия вышеописанного API, работающая по принципу «выстрелил и забыл». Эта конечная точка помещает в очередь пакет событий и обрабатывает их, не отправляя клиентам подтверждений получения и сохранения данных. Используется она для решения таких задач, как логирование или трассировка, когда пользователей больше заботит пропускная способность системы, а не риск потерять небольшой объём данных.
{
"namespace": "my_dataset",
"events": [
{
"timeSeriesId": "profile100",
"eventTime": "2024-10-03T21:24:23.988Z",
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventItems": [
{
"eventItemKey": "deviceType",
"eventItemValue": "aW9z"
},
{
"eventItemKey": "deviceMetadata",
"eventItemValue": "c29tZSBtZXRhZGF0YQ=="
}
]
},
{
"timeSeriesId": "profile100",
"eventTime": "2024-10-03T21:23:30.000Z",
"eventId": "123e4567-e89b-12d3-a456-426614174000",
"eventItems": [
{
"eventItemKey": "deviceType",
"eventItemValue": "YW5kcm9pZA=="
}
]
}
]
}
ReadEventRecords: эта конечная точка, получает комбинацию из аргументов namespace
, timeSeriesId
, timeInterval
, в состав которых может входить необязательный аргумент eventFilters. Получив их, она возвращает все подходящие события, отсортированные в нисходящем порядке по полю event_time
. Работает она с очень низкими задержками, измеряемыми миллисекундами.
{
"namespace": "my_dataset",
"timeSeriesId": "profile100",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"eventFilters": [
{
"matchEventItemKey": "deviceType",
"matchEventItemValue": "aW9z"
}
],
"pageSize": 100,
"totalRecordLimit": 1000
}
SearchEventRecords: эта конечная точка, получая критерий поиска и временной интервал, возвращает все подходящие события. Она подходит для тех случаев, когда пользователя устроит чтение данных уровень согласованности которых описывается моделью «согласованность в “конечном счёте”».
{
"namespace": "my_dataset",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"searchQuery": {
"booleanQuery": {
"searchQuery": [
{
"equals": {
"eventItemKey": "deviceType",
"eventItemValue": "aW9z"
}
},
{
"range": {
"eventItemKey": "deviceRegistrationTimestamp",
"lowerBound": {
"eventItemValue": "MjAyNC0xMC0wMlQwMDowMDowMC4wMDBa",
"inclusive": true
},
"upperBound": {
"eventItemValue": "MjAyNC0xMC0wM1QwMDowMDowMC4wMDBa"
}
}
}
],
"operator": "AND"
}
},
"pageSize": 100,
"totalRecordLimit": 1000
}
AggregateEventRecords: эта конечная точка, получая критерии поиска и сведения о необходимом режиме агрегации данных (например — DistinctAggregation
), выполняет запрошенную у неё агрегацию данных в пределах заданного временного интервала. Пользователь этой конечной точки может допустить применение модели согласованности данных «согласованность в “конечном счёте”» и достаточно большие, измеряемые секундами, задержки. Это напоминает то, как организована работа с конечной точкой Search
.
{
"namespace": "my_dataset",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"searchQuery": {...some search criteria...},
"aggregationQuery": {
"distinct": {
"eventItemKey": "deviceType",
"pageSize": 100
}
}
}
Ниже поговорим о том, как мы взаимодействуем с этими данными в слое хранения данных.
Слой хранения данных
Слой хранения данных для абстракции TimeSeries состоит из основного хранилища данных и необязательного хранилища индексов. Основное хранилище обеспечивает устойчивость хранения данных при выполнении операций записи и используется для выполнения основных операций чтения. А хранилище индексов используется для выполнения операций поиска и агрегирования данных. В Netflix предпочтительным вариантом для устойчивого хранения данных в условиях, когда требуется высокая пропускная способность хранилища, является Apache Cassandra. А для хранения индексов мы обычно используем Elasticsearch. Правда, и это похоже на наш подход к работе с API, слой хранения данных не привязан именно к этим хранилищам. Мы, вместо жёсткой привязки, определяем контакты API хранения данных, выполнение которых позволяет нам работать с любыми базовыми хранилищами. Это даёт системе гибкость, позволяя, при необходимости, менять базовые хранилища.
Основное хранилище данных
В этом разделе поговорим о том, как мы используем Apache Cassandra для решения тех задач, которые стоят перед абстракцией TimeSeries.
Схема разбиения наборов данных на разделы
Учитывая масштабы Netflix, постоянный приток данных, описывающих события, способен быстро перегрузить обычные базы данных. Эту проблему решает применяемая нами стратегия секционирования темпоральных данных. А именно, речь идёт о том, что данные делятся на фрагменты, которыми легко управлять. Разделение ведётся по временным интервалам. Например, могут использоваться интервалы, равные часу, дню, месяцу. Такой подход помогает организовать эффективное выполнение запросов к данным, принадлежащих к определённому временному интервалу, избавляя нас от необходимости сканировать весь набор данных. Он, кроме того, позволяет Netflix эффективно архивировать, сжимать или удалять старые данные, оптимизируя и хранение данных, и выполнение запросов. Кроме того, секционирование данных позволяет минимизировать проблемы производительности, обычно связанные с применением в Cassandra «широких» разделов хранилищ. Задействуя эту стратегию, мы можем гораздо эффективнее использовать место на дисках, так как она снижает необходимость в резервировании огромных объёмов пространства для выполнения операций объединения таблиц и фактического удаления данных. Это позволяет нам экономить.
Вот как это выглядит:
Временной срез (Time Slice). Временной срез — это единица хранения данных, которая прямо соответствует таблице Cassandra. Мы создаём множество подобных временных срезов, каждый из которых покрывает определённый временной интервал. Событие попадает в один из срезов в соответствии со своим свойством event_time
. Срезы следуют один за другим без разрывов во времени, а применяемые операции работают с диапазонами, в которые входит их начальная граница, а конечная граница — не входит. Это обеспечивает такое поведение системы, при котором все события, так или иначе, попадают в какой-то из временных срезов. Используя временные срезы, мы можем эффективно удалять те данные, период хранения которых закончился. Для удаления данных некоего временного среза достаточно удалить соответствующую таблицу. Это снижает наши потребности в пространстве для хранения данных и позволяет экономить.
Почему бы не использовать подход к определению времени жизни объекта (Time-To-Live, TTL), основанный на строках базы данных?
Использование понятия времени жизни объекта в применении к индивидуальным событиям создаст в Cassandra серьёзное количество объектов, предназначенных для удаления (tombstone). Это приведёт к ухудшению производительности. Особенно — при выполнении сканирования диапазонов записей. Использование дискретных временных срезов и удаление их целиком позволяет нам полностью решить проблему объектов, помеченных для удаления. Единственный минус — это то, что данные могут храниться чуть дольше, чем необходимо. Дело в том, что до того, как таблицу можно будет удалить, весь временной диапазон этой таблицы должен выйти за пределы окна сохранения данных (retention window). Кроме того, установленные ранее сроки жизни объектов сложно перенастроить. А абстракция TimeSeries способна мгновенно увеличивать время хранения набора данных, используя всего одну операцию из уровня управления сервиса.
Распределение событий по контейнерам, соответствующим временным интервалам (Time Buckets). Внутри временного среза производится дальнейшее секционирование данных, их группировка по временным интервалам. Это способствует повышению эффективности сканирования диапазонов значений, позволяя нам выбирать конкретные группы событий, входящие во временной диапазон, необходимый для выполнения некоего запроса. Минус этого решения заключается в том, что если пользователь хочет прочесть все данные за большой период времени, нам необходимо сканировать множество разделов. Мы смягчаем проблему выполнения таких вот медленных операций, сканируя разделы параллельно, и, в итоге, агрегируя данные. В большинстве случаев плюсы ориентации на более мелкие подмножества данных перевешивают проблему «усиления чтения данных», возникающую, когда сначала читают множество фрагментов данных, а потом собирают эти данные воедино. Обычно пользователи читают небольшие подмножества данных, а не полные наборы актуальных данных.
Распределение событий по контейнерам событий (Event Buckets). Сгруппировав события по временным интервалам, мы распределяем их по контейнерам событий. Делается это для того, чтобы обеспечить нормальную обработку операций записи, требующих чрезвычайно высокой пропускной способности. Такие операции могут приводить к кратковременным всплескам запросов на запись данных в некий временной ряд. Это предотвращает перегрузку определённого раздела, рассчитанного на запись данных некоего временного интервала, и, кроме того, ещё сильнее уменьшает размеры разделов. Правда, достигается это ценой небольшого увеличения «усиления чтения данных».
Обратите внимание на то, что, начиная с Cassandra версии 4.x, мы заметили серьёзный рост производительности сканирования диапазонов данных в «широких» разделах. Ниже, когда мы будем говорить о наших планах на будущее, касающихся расширения TimeSeries Abstraction, вы можете найти рассказ о динамическом распределении связанных событий по контейнерам. Это действие направлено на то, чтобы воспользоваться выгодами, которые дают новые версии Cassandra.
Таблицы для хранения данных
Мы используем таблицы двух видов:
Таблицы данных: это временные срезы, в которых хранятся фактические данные событий.
Таблица метаданных: это таблица, которая хранит информацию о настройках временных срезов, относящихся к конкретным пространствам имён.
Таблицы данных
Ключ раздела (partition key) позволяет разделять события, относящиеся к time_series_id
, помещая их в один или несколько контейнеров time_bucket
и event_bucket
. Это позволяет избежать перегрузки разделов. При этом ключ кластеризации (clustering key) позволяет хранить на диске данные, отсортированные в том порядке, в котором мы, почти всегда, планируем их читать. Столбец value_metadata
хранит метаданные для event_item_value
— такие, как сведения о сжатии данных.
Запись в таблицу данных
Данные, записываемые пользователем, окажутся в соответствующем временном срезе, контейнере временного интервала и контейнере событий. То, где именно они окажутся, определяется показателем, вычисляемым на основе их свойства event_time
. Порядок его вычисления задаётся в настройках уровня управления конкретного пространства имён.
Например — порядок расчёта показателя time_bucket
может выглядеть так:
В ходе этого процесса система записи данных принимает решения о том, как именно обрабатывать данные до записи. Например — о необходимости их сжатия. Столбец value_metadata
содержит сведения об операциях постобработки данных, обеспечивая то, что система чтения данных сможет точно их интерпретировать.
Чтение из таблицы данных
На следующей иллюстрации показана общая схема параллельного чтения данных из нескольких разделов с их последующей сборкой в итоговый набор данных, который используется для возврата результата запроса.
Таблица метаданных
В этой таблице хранятся данные о настройках временных срезов для конкретных пространств имён.
Обратите внимание на следующее:
Сведения о временных срезах следуют друг за другом без разрывов во времени. Параметр
end_time
предыдущего среза перекрывается с параметромstart_time
следующего. Благодаря этому все события найдут своё место в хранилище.Попадание таблицы в окно сохранения данных. В таблице хранятся сведения о том, попадают ли таблицы, к которым имеют отношение метаданные, в окно сохранения данных.
Хранилище метаданных отличается гибкостью. Каждый из временных срезов можно снабдить собственными метаданными, что позволяет настраивать параметры временных срезов, которые попадут в систему в будущем, основываясь на выявленных паттернах работы с данными, характерных для текущего временного среза.
В столбце metadata
можно хранить гораздо больше сведений (например — настройки компактификации данных), но мы тут, для краткости, показываем лишь настройки разбиения данных на разделы.
Хранилище индексов
Для обеспечения работы дополнительных сценариев доступа к данным, предусматривающих работу с атрибутами, не являющимися уникальными идентификаторами записей, мы индексируем данные в Elasticsearch. Пользователи могут настраивать, для конкретных пространств имён, списки атрибутов, по которым они хотят проводить поиск данных и/или их агрегацию. Сервис извлекает эти поля из данных, описывающих события, по мере их поступления в систему, индексирует получившиеся документы и записывает их в Elasticsearch. Мы, в зависимости от пропускной способности, можем использовать Elasticsearch в роли обратного индекса, получая полные данные из Cassandra, или можем хранить в Elasticsearch полные версии исходных данных.
Обратите внимание на то, что, как и в случае с Cassandra, у пользователей нет прямого доступа к Elasticsearch. Пользователи, вместо этого, взаимодействуют с нашими API Search и Aggregate, которые преобразуют отправленные к ним запросы в форму, понятную базовому хранилищу.
Теперь поговорим о том, как мы настраиваем применяемые хранилища данных в расчёте на разных наборы данных.
Плоскость управления сервиса
Плоскость данных нашего сервиса отвечает за выполнение операций чтения и записи, а плоскость управления позволяет настраивать все аспекты поведения системы, относящиеся к определённому пространству имён. Плоскость данных обменивается данными со стеком управления абстракции TimeSeries, который контролирует конфигурационную информацию. Сам же стек управления, в свою очередь, взаимодействует с шардированной плоскостью управления системы Data Gateway Platform, которая обеспечивает контроль над конфигурациями, управляющими всеми абстракциями и пространствами имён.
Разделение сфер ответственности плоскости данных и плоскости управления помогает нам обеспечивать высокий уровень доступности плоскости данных. Дело в том, что плоскость управления отвечает за решение задач, которые могут требовать, в какой-либо форме, согласования структуры и модели данных с базовыми хранилищами информации.
Настройка пространств имён
Ниже показан пример настроек, демонстрирующий огромную гибкость нашего сервиса, а так же то, как мы настраиваем различные параметры пространств имён, используя плоскость управления.
"persistence_configuration": [
{
"id": "PRIMARY_STORAGE",
"physical_storage": {
"type": "CASSANDRA", // тип основного хранилища
"cluster": "cass_dgw_ts_tracing", // имя физического кластера
"dataset": "tracing_default" // соответствие пространству ключей
},
"config": {
"timePartition": {
"secondsPerTimeSlice": "129600", // ширина временного среза
"secondPerTimeBucket": "3600", // ширина временного интервала для группировки событий
"eventBuckets": 4 // количество событий, помещаемых в контейнер событий
},
"queueBuffering": {
"coalesce": "1s", // период объединения операций записи
"bufferCapacity": 4194304 // ёмкость очереди, измеряемая в байтах
},
"consistencyScope": "LOCAL", // один или несколько регионов
"consistencyTarget": "EVENTUAL", // согласованность чтения/записи
"acceptLimit": "129600s" // отрезок времени, уходящий в прошлое, в рамках которого разрешена запись
},
"lifecycleConfigs": {
"lifecycleConfig": [ // Параметры сохранения данных для основного хранилища
{
"type": "retention",
"config": {
"close_after": "1296000s", // закрытие для чтения/записи
"delete_after": "1382400s" // удаление временного среза
}
}
]
}
},
{
"id": "INDEX_STORAGE",
"physicalStorage": {
"type": "ELASTICSEARCH", // тип хранилища индексов
"cluster": "es_dgw_ts_tracing", // имя кластера ES
"dataset": "tracing_default_useast1" // имя базового индекса
},
"config": {
"timePartition": {
"secondsPerSlice": "129600" // ширина индексного среза
},
"consistencyScope": "LOCAL",
"consistencyTarget": "EVENTUAL", // как нужно читать/записывать данные
"acceptLimit": "129600s", // отрезок времени, уходящий в прошлое, в рамках которого разрешена запись
"indexConfig": {
"fieldMapping": { // поля, которые извлекают и помещают в индекс
"tags.nf.app": "KEYWORD",
"tags.duration": "INTEGER",
"tags.enabled": "BOOLEAN"
},
"refreshInterval": "60s" // Настройки, имеющие отношение к индексу
}
},
"lifecycleConfigs": {
"lifecycleConfig": [
{
"type": "retention", // Настройки параметров времени хранения индекса
"config": {
"close_after": "1296000s",
"delete_after": "1382400s"
}
}
]
}
}
]
Подготовка инфраструктуры к работе
Учитывая то, что у нас имеется так много разнообразных параметров, нам нужна автоматизированная система, позволяющая готовить к работе инфраструктуру хранилища и определять наилучшие настройки системы в расчёте на конкретную задачу. Когда пользователь хочет создать собственное пространство имён, он составляет список пожеланий, выдвигаемых его задачей. Автоматизированная система берёт этот список и преобразует его в конкретные настройки, касающиеся инфраструктуры хранилища и плоскости управления. Мы очень рекомендуем вам посмотреть это выступление с ApacheCon, где один из наших замечательных коллег рассказывает о том, как мы этого достигли. Возможно, в одном из следующих материалов, мы остановимся на этом подробнее.
После того, как система готовит к работе изначальный вариант инфраструктуры, она способна масштабировать его в ответ на запросы задачи пользователя. В следующем разделе мы расскажем о том, как это делается.
Масштабирование инфраструктуры
Во время подготовки пространств имён к работе пользователи могут обладать лишь ограниченными сведениями о том, что именно нужно для успешного решения их задачи. Из-за этого на начальном этапе работы система пытается обеспечить пользователям максимально доступное качество услуг. По прошествии некоторого времени рабочая нагрузка, характерная для задачи пользователя, может продемонстрировать потребность в более высокой пропускной способности хранилища. Вот как мы поступаем в подобных ситуациях:
Горизонтальное масштабирование. Экземпляры серверов TimeSeries могут, в соответствии с назначенными им политиками, автоматически масштабироваться и в сторону роста, и в сторону уменьшения мощности. Благодаря этому система способна обеспечить то, что нужно конкретной задаче. Ёмкость серверного хранилища можно, с помощью нашего планировщика ёмкости, пересчитать под меняющиеся требования
Вертикальное масштабирование. Мы, кроме того, можем принять решение о вертикальном масштабировании экземпляров серверов TimeSeries или серверов хранилищ данных. Делаем мы это для того, чтобы получить в своё распоряжение больше ресурсов процессора, памяти и/или большую ёмкость хранилищ данных.
Масштабирование дисковой подсистемы. Мы можем подключить к системе блочное хранилище (EBS, Amazon Elastic Block Store) в том случае, если планировщик отдаёт предпочтение инфраструктуре, предлагающей большую ёмкость хранилища за меньшую цену, а не твердотельным накопителям, обеспечивающим более низкие задержки. В подобных случаях мы развёртываем задания, масштабирующие объём EBS в том случае, если объём данных, хранящихся на дисках, достигает определённого предела, выражаемого в виде процентного показателя.
Переразбиение данных на разделы. Неточные оценки ресурсов, необходимых для решения задачи пользователя, могут привести к тому, что наборы данных окажутся разбиты на разделы не так, как нужно. Разделов может быть как слишком мало, так и слишком много. Плоскость управления TimeSeries может осуществлять настройку разбиения на разделы для будущих временных срезов. Делается это после того, как мы узнаём о том, какова природа данных, с которыми работаем, воспользовавшись гистограммами разбиения данных на разделы. В перспективе мы планируем поддерживать переразбиение на разделы уже сохранённых данных, а так же — динамическое разбиение на разделы для текущих данных.
Принципы проектирования системы
До сих пор мы говорили лишь о том, как абстракция TimeSeries хранит и настраивает наборы данных, описывающих события, а так же о том, как она взаимодействует с этими данными. А теперь порассуждаем о том, что можно сделать для того, чтобы увеличить производительность операций с данными и повысить качество обслуживания пользователей.
Идемпотентность событий
Мы встраиваем поддержку идемпотентности во все конечные точки, способные осуществлять мутации данных. Благодаря этому пользователи могут безопасно повторять запросы или выполнять хедж-запросы. Хеджирование запросов — это отправка клиентом на сервер конкурирующего запроса в том случае, если сервер никак не отреагировал на исходный запрос в тот промежуток времени, в который клиент ожидает получить ответ. После отправки хедж-запроса клиента устроит как сообщение об успешном выполнении исходного запроса, так и сообщение о выполнении хедж-запроса. Делается это для того, чтобы поддерживать «хвостовые задержки» запросов приложения на сравнительно низком уровне. Безопасно реализовать такую схему работы можно только в том случае, если мутации данных являются идемпотентными. В случае с TimeSeries ключ идемпотентности для события с идентификатором time_series_id
представляет собой комбинацию event_time
, event_id
и event_item_key
.
Хеджирование запросов, основанное на SLO
Мы назначаем различным конечным точкам в TimeSeries целевые показатели SLO (Service Level Objective, цель уровня обслуживания). Они служат индикаторами того уровня производительности, который, как мы полагаем, должна обеспечивать конкретная конечная точка для конкретного пространства имён. После этого можно хеджировать запросы в том случае, если ответы на них не возвращаются в течение заданного промежутка времени.
"slos": {
"read": { // Показатели SLO для конечной точки
"latency": {
"target": "0.5s", // хеджировать запросы, ответов на которых нет примерно столько времени
"max": "1s" // время срабатывания тайм-аута, после которого клиент уже не ждёт ответа
}
},
"write": {
"latency": {
"target": "0.01s",
"max": "0.05s"
}
}
}
Возврат лишь части запрошенных данных
Иногда клиент может быть чувствителен к задержкам, выражая при этом готовность принять от сервера лишь часть запрошенных результатов. Пример этого, встречающийся на практике — ограничение частоты в реальном времени. В данном случае точность не играет определяющей роли, а вот если ответ сервера задержится — он окажется практически бесполезным для клиента. Поэтому клиент предпочитает работать с любыми данными, собранными до некоего момента, а не ждать срабатывания тайм-аутов своих запросов. Клиент TimeSeries поддерживает возврат лишь части запрошенных данных. Время их возврата связано с SLO. Важно то, что мы, даже возвращая частичную выборку данных, поддерживаем порядок событий в состоянии, актуальном на момент их чтения.
Адаптивная пагинация
В начале всех операций чтения показатель разветвления устанавливается в стандартное значение, что приводит к параллельному сканированию 8 фрагментов разделов. Сервисный слой может выяснить, что time_series
относится к разряду плотных наборов данных, то есть — что для решения большинства задач чтения данных достаточно прочесть первые несколько фрагментов. В таком случае показатель разветвления для будущих операций чтения динамически подстраивается таким образом, чтобы снизить эффект «усиления чтения» в базовом хранилище данных. И аналогично, если набор данных является разреженным — может понадобиться увеличить этот показатель, учитывая при этом наличие адекватной верхней границы для него.
Ограниченное окно записи данных
В большинстве случаев активный диапазон времени для записи данных меньше, чем диапазон для чтения данных. То есть — мы стремимся к тому, чтобы данные, записанные в пределах некоего временного диапазона, как можно скорее становились бы иммутабельными, что даст нам возможность как-то их оптимизировать. Мы управляем этим с помощью настраиваемого параметра acceptLimit
, который не даёт пользователям записывать в систему данные событий, которые не укладываются в этот лимит. Например, acceptLimit
размером в 4 часа означает, что пользователи не могут записывать в систему события, возраст которых превышает нечто вроде сейчас()
— 4 часа
. Иногда мы смягчаем этот лимит, например — для записи в систему исторических данных, но для выполнения обычных операций записи мы возвращаем его к стандартному значению. После того, как данные, попадающие в некий временной диапазон, становятся иммутабельными, мы можем безопасно их кешировать, сжимать, компактифицировать их для оптимизации операций чтения данных.
Буферизация операций записи данных
Мы часто используем этот сервис для обработки огромных потоков данных, возникающих во время пиковых нагрузок. Вместо того, чтобы одномоментно перегружать базовое хранилище всеми этими данными, мы стараемся распределить нагрузку более равномерно, позволяя событиям объединяться в группы в течение коротких периодов времени (обычно измеряемых секундами). Эти события накапливаются в очередях, располагающихся в памяти в каждом из экземпляров серверов. Затем выделенные потребители событий постепенно обрабатывают очереди, группируя события по ключам разделов и осуществляя пакетные операции записи в базовое хранилище.
Очереди привязаны к каждому из хранилищ данных, так как их функциональные характеристики зависят от конкретных хранилищ, в которые осуществляется запись. Например, пакет данных, который может быть записан в Cassandra, гораздо меньше пакета, который может быть проиндексирован в Elasticsearch. Это ведёт к тому, что очереди обрабатываются с разной скоростью, и к тому, что размеры пакетов данных отличаются для разных потребителей событий, связанных с очередями.
Хотя использование очередей, размещаемых в памяти, повышает потребность в операциях сборки мусора, проводимых в JVM, мы заметили существенные улучшения, перейдя на JDK 21 со сборщиком мусора ZGC. Для того, чтобы проиллюстрировать эти улучшения, можно сказать, что применение ZGC снизило наши «хвостовые задержки» на впечатляющие 86%:
Из-за того, что мы используем очереди, размещаемые в памяти, система подвержена проблеме потери сведений о событиях в случае сбоя экземпляра сервера. Поэтому очереди мы применяем только в тех случаях, когда допустима потеря некоего объёма данных. Например — в задачах логирования или трассировки. Для случаев, когда нужен гарантированный уровень устойчивости хранения данных, или тогда, когда нужно обеспечить согласованность данных при контрольном считывании данных после записи, очереди отключают, а выполнение операций записи приводит к практически мгновенному сбросу данных в хранилище.
Динамическая компактификация данных
После того, как временной срез покинет активное окно записи, мы можем воспользоваться тем, что данные окажутся иммутабельными, и оптимизировать их ради повышения производительности чтения данных. Сюда могут входить повторная компактификация иммутабельных данных с использованием оптимальной стратегии компактификации, динамическое уплотнение или разделение шардов для оптимизации использования системных ресурсов. Тут могут применяться и другие приёмы обработки данных, направленные на улучшение производительности и надёжности системы.
В следующем разделе мы взглянем на производительность TimeSeries при обработке некоторых наборов данных, которую система демонстрирует на реальных задачах.
Производительность абстракции TimeSeries на реальных задачах
Сервис может записывать данные с низкими задержками, не превышающими десяти миллисекунд.
При этом он постоянно обеспечивает стабильные задержки при чтении данных, относящихся к отдельным событиям.
На момент написания этой статьи сервис, в глобальном масштабе, обрабатывал примерно 15 миллионов событий в секунду. Речь идёт пиковых нагрузках при работе со всеми наборами данных.
Использование абстракции TimeSeries в Netflix
Система TimeSeries Abstraction необходима для обеспечения бесперебойной работы ключевых сервисов Netflix. Вот несколько интересных примеров:
Логирование и анализ данных. Логирование, распространяемое на все приложения и микросервисы Netflix, позволяет разобраться в особенностях того, как сервисы взаимодействуют друг с другом. Эти сведения полезны при отладке ошибок и при ответах на вопросы, поступающих в службу поддержки.
Наблюдение за тем, как пользователи взаимодействуют с системой. В систему попадают миллионы записей о действиях пользователей. Это могут быть события, связанные с воспроизведением видео, с поиском, с потреблением другого контента. Сведения об этом помогают, в режиме реального времени, настраивать рекомендательные алгоритмы Netflix и улучшать впечатления пользователей от работы с нашими программами.
Анализ выпуска нового функционала и наблюдение за его работой. Наблюдение за выпуском нового функционала, и за тем, насколько хорошо он работает, позволяет инженерам Netflix оценивать то, как именно пользователи взаимодействуют с этим новым функционалом. А это, в свою очередь, помогает, основываясь на полученных данных, принимать решения о будущих улучшениях.
Наблюдение за действиями пользователей, не связанными с просмотром видео, и оптимизация соответствующих механизмов. Наблюдение за действиями пользователей, не связанными с просмотром видео, позволяет обеспечить эффективную доставку контента. При этом соответствующие механизмы, за счёт получения обратной связи в режиме реального времени, можно оптимизировать.
Выставление счетов и управление подписками. Система обеспечивает хранение исторических данных, имеющих отношение к выставлению счетов и к управлению подписками. Она гарантирует точность записей и обеспечивает работу службы поддержки пользователей.
И это — далеко не всё.
О будущих улучшениях абстракции TimeSeries
С развитием способов применения TimeSeries Abstraction, по мере того, как растёт потребность в повышении экономической эффективности системы, мы стремимся к тому, чтобы, в ближайшие месяцы, внести в неё множество улучшений. Вот некоторые из них:
Применение многоуровневого хранилища данных для повышения экономической эффективности системы. Поддержка перемещения более старых данных, к которым обращаются реже, чем к другим, в более дешёвые объектные хранилища, отличающиеся более длительным временем, проходящим до получения первого байта. Это улучшение способно сэкономить Netflix миллионы долларов.
Динамическое распределение событий по контейнерам событий. Поддержка партиционирования ключей в реальном времени, по мере поступления событий, с разделением данных на разделы оптимального размера. Такой подход может использоваться вместо некоей статической конфигурации, создаваемой во время подготовки пространства имён к работе. Новая стратегия отличается большим преимуществом, которое заключается в том, что
time_series_id
, не нуждающиеся в партиционировании, не подвергаются разбиению на разделы. Это позволяет экономить ресурсы, затрачиваемые в противном случае при возникновении «усиления чтения данных». Кроме того, с применением Cassandra 4.x, мы заметили серьёзные улучшения, касающиеся операций чтения подмножеств данных из «широких» разделов. Это может привести к тому, что мы будем менее агрессивно заниматься заблаговременным разбиением наборов данных на разделы.Кеширование. Принимая во внимание иммутабельность данных, с которыми мы работаем, мы можем организовать их интеллектуальное кеширование для отдельных временных диапазонов.
Подсчёт событий и другие механизмы их агрегации. Это — полезное улучшение, так как некоторых пользователей интересуют лишь сведения о количестве событий, произошедших в некоем временном интервале, а загрузка всех данных этих событий им не нужна.
Итоги
Система TimeSeries Abstraction — это жизненно важный компонент онлайн-инфраструктуры Netflix, отвечающей за хранение данных. Она играет значительную роль в поддержке механизмов принятия решений. Речь идёт как о решениях, принимаемых в отношении текущих событий, так и о долгосрочных решениях. TimeSeries Abstraction играет ключевую роль в обеспечении стабильной и эффективной работы Netflix в глобальном масштабе. Эта абстракция помогает оптимизировать взаимодействие пользователей с системой, предоставляя аналитические сведения об их поведении. Кроме того, TimeSeries Abstraction обеспечивает работу механизмов мониторинга производительности в ситуациях, когда система работает под большой нагрузкой.
По мере того, как Netflix продолжит внедрять инновации и расширяться, TimeSeries Abstraction останется краеугольным камнем нашей платформы, помогая нам осваивать новые границы возможностей стриминга и выходить за их пределы.
О, а приходите к нам работать? ? ?
Мы в wunderfund.io занимаемся высокочастотной алготорговлей с 2014 года. Высокочастотная торговля — это непрерывное соревнование лучших программистов и математиков всего мира. Присоединившись к нам, вы станете частью этой увлекательной схватки.
Мы предлагаем интересные и сложные задачи по анализу данных и low latency разработке для увлеченных исследователей и программистов. Гибкий график и никакой бюрократии, решения быстро принимаются и воплощаются в жизнь.
Сейчас мы ищем плюсовиков, питонистов, дата-инженеров и мл-рисерчеров.