За последние несколько лет те, кто занимается в Netflix направлением Content Engineering, перевели множество служб компании на использование федеративной платформы GraphQL. Этот процесс продолжается и сегодня. Применение федерации GraphQL даёт командам, отвечающим за различные предметные области, новые возможности. Теперь они могут, независимо от других команд, создавать и использовать собственные графовые службы, относящихся к сфере их деятельности (Domain Graph Service, DGS). Команды, кроме того, могут связывать свои предметные области с другими областями в унифицированной схеме GraphQL, доступ к которой даёт федеративный шлюз.

Давайте, в качестве примера, рассмотрим три главнейшие сущности этого графа. Владельцем каждой из них является отдельная команда инженеров:

  • Movie (Кино). В Netflix создают видеопродукты (шоу, фильмы, короткометражки и тому подобное).

  • Production (Продакшн). Каждая сущность Movie связана с сущностью Studio Production. Объект Production содержит записи обо всём, что нужно, чтобы снять Movie. В том числе — сведения о местах съёмок, о вендорах и о многом другом.

  • Talent (Талантливый человек). Людей, работающих над сущностью Movie, мы описываем, пользуясь сущностью Talent. Это — актёры, режиссёры и так далее.

https://miro.medium.com/v2/resize:fit:700/0*zWqzYt1KvSpkVSSL
Пример схемы GraphQL

После того, как сущности, подобные тем, что показаны выше, окажутся в графе, возникает вполне обычная для многих сотрудников Netflix ситуация. А именно — у них появляется потребность в том, чтобы искать одни сущности, основываясь на атрибутах других сущностей, связанных с ними. Для этого делаются соответствующие запросы. Например — «дай мне все фильмы, которые сейчас снимаются с участием Райана Рейнольдса в качестве актёра».

Как давать ответы на подобные запросы при использовании архитектуры федеративного графа, учитывая то, что работу каждой сущности поддерживает её собственная служба? Службе Movie понадобилось бы назначить конечную точку, принимающую запрос и фильтры, которые, возможно, будут применяться к данным, не находящимся во владении этой службы. Предоставленные ей данные она должна будет использовать для идентификации подходящих сущностей Movie и для их возврата.

На самом деле — любой службе, владеющей некими сущностями, может понадобиться решать подобную задачу.

Организация поиска в федеративном графе — это распространённая задача. Нас её решение привело к созданию платформы Studio Search.

Эта платформа была спроектирована так, чтобы она могла бы брать и делать пригодным для поиска фрагмент федеративного графа, представленный подграфом с корнем, находящимся в интересующей нас сущности. К сущностям подграфа можно делать текстовые запросы. Их можно фильтровать, ранжировать, фасетировать. В следующем разделе поговорим о том, как мы сделали всё это возможным.

Знакомство со Studio Search

Прочтя о том, что мы хотели позволить командам что‑то искать, вы, вероятно, подумали, что мы будем строить некий индекс. В этом направлении работала и мысль наших сотрудников! Получается, что нам надо построить индекс фрагмента федеративного графа.

Как пользователи сообщают нам о том, какая порция графа им нужна? При этом интересующая их порция графа, вероятнее всего, будет состоять из данных, предоставленных множеством служб. Поэтому возникает ещё более важный вопрос: как нам в этих условиях поддерживать актуальность индекса с учётом состояния всех этих служб?

Мы, в качестве базовой технологии для индекса, выбрали Elasticsearch. Мы решили, что для построения конвейера индексирования данных нам нужны три основных информационных составляющих:

  • Определение интересующего пользователя подграфа, корень которого соответствует сущности, на которую, в основном, будет направлен поиск.

  • События, позволяющие уведомлять платформу об изменениях сущностей, находящихся в подграфе.

  • Конфигурация индекса, задающая, например, то, должно ли некое поле использоваться для полнотекстовых запросов, и то, вложен ли в некое поле подчинённый документ.

Короче говоря — наше решение задачи поиска по графу было основано на создании индекса для интересующего пользователя подграфа. Этот индекс должен отражать актуальное состояние данных, предоставляемых различными службами федеративного графа, синхронизируясь с ними в режиме, близком к реальному времени.

GraphQL даёт нам простой и понятный механизм определения подграфов. Это — единственный шаблонный GraphQL‑запрос, который позволяет извлечь из системы все данные, которые пользователю нужно использовать при поиске.

Вот — пример шаблона запроса GraphQL. Он позволяет получить данные сущностей Movies, а так же — данные связанных с ними сущностей Productions и Talent.

https://miro.medium.com/v2/resize:fit:700/1*9FhJnH2MiYQvFHysmSQpuA.png
Пример GraphQL-запроса

Для поддержания индекса в актуальном состоянии используются события, вызывающие операции повторного индексирования отдельных сущностей при их изменении. Для вызова этих операций предпочтительными являются события захвата изменения данных (Change Data Capture, CDC). Большинство команд генерирует такие события с использованием CDC‑коннекторов Netflix. Но там, где это нужно, поддерживаются и события приложения.

Все данные, которые должны быть индексированы, загружаются из федеративного графа. Поэтому в событиях нужно передавать лишь идентификаторы сущностей. Затем эти идентификаторы будут подставлены в шаблон запроса GraphQL для загрузки соответствующей сущности и любых связанных с ней данных.

Используя тип информации, присутствующий в шаблоне запроса GraphQL, и заданную пользователем конфигурацию индекса, мы смогли создать шаблон индекса. Для этого применялся набор пользовательских анализаторов текста Elasticsearch, который хорошо подходил для использования во всех предметных областях.

В результате может быть создан Data Mesh‑конвейер. Он состоит из источника CDC‑событий, предоставленного пользователем, из процессора для обогащения событий с применением GraphQL‑запроса, тоже предоставленного пользователем, и из sink‑коннектора, публикующего данные в Elasticsearch.

Архитектура

Если собрать вместе всё вышеописанное — получится архитектурное решение, упрощённый вариант которого показан ниже.

https://miro.medium.com/v2/resize:fit:700/0*LfYcAWOrhjbp9mA7
Архитектура подсистемы индексирования Studio Search

Вот как это работает:

  1. Приложения Studio выдают события для потоков Kafka со схемой данных в Data Mesh. Делается это двумя способами:

    1. Путём взаимодействия с базой данных, мониторинг которой выполняется CDC‑коннектором, который создаёт события.

    2. Путём непосредственного создания событий с использованием клиента Data Mesh.

  2. События со схемой данных потребляются процессорами Data Mesh, реализованными на базе фреймворка Apache Flink. У некоторых сущностей имеется несколько событий, реагирующих на их изменения, поэтому мы задействуем объединяющие процессоры, комбинирующие данные из множества потоков Kafka.

    1. Процессор GraphQL выполняет GraphQL‑запрос, предоставленный пользователем. Он загружает документы из федеративного шлюза.

    2. Федеративный шлюз, в свою очередь, загружает данные из приложений Studio.

  3. Документы, загруженные из федеративного шлюза, помещают в другой топик Kafka со схемой данных. Это делается до их обработки sink‑коннектором Elasticsearch в DataMesh, где они помещаются в индекс Elasticsearch, настроенный с помощью шаблона индексирования, созданного специально для полей и типов, представленных в документе.

Обратный поиск

Вы могли заметить, что в вышеприведённых пояснениях кое‑чего не хватает. Если индекс заполняется на основе событий, содержащих сведения о сущностях Movie — тогда как он остаётся в актуальном состоянии при изменении сущностей Production или Talent? Мы решили эту задачу, применив обратный поиск. Когда делается изменение связанной сущности — нам нужно найти все основные сущности, на которые это может повлиять, и вызвать для них соответствующие события. Мы делаем это и беря нужные сведения из самого индекса, и выполняя запросы ко всем основным сущностям, имеющим отношение к изменённой сущности.

Предположим, в индексе имеется документ, выглядящий так, как показано ниже.

https://miro.medium.com/v2/resize:fit:700/0*qBiCX79OcsHNvXSh
Пример документа Elasticsearch

Конвейер замечает изменение в сущности Production, ptpId которой — abс. В этой ситуации мы можем сделать к индексу запрос на получение всех документов, в которых production.ptpId == “abc”, и извлечь movieId. Затем мы можем передать полученный идентификатор movieId дальше по конвейеру индексирования.

Масштабирование решения

Решение, к которому мы пришли, работало достойно. Команды легко смогли публиковать требования к индексам своих подграфов. Делалось это с помощью шаблонов GraphQL‑запросов. Они могли использовать существующие инструменты для генерирования событий, позволяющих индексу оставаться актуальным практически в режиме реального времени. Использование для организации обратного поиска тех данных, что уже были в индексе, позволило нам держать всю логику, необходимую для работы со связанными сущностями, внутри нашей системы, оградив пользователей от ненужных сложностей. На самом деле — система работала настолько хорошо, что нас прямо‑таки завалили просьбами на интеграцию со Studio Search. В результате этот проект начал использоваться в значительной части механизмов взаимодействия с пользователями в пределах Content Engineering.

Сначала мы выполняли запросы на интеграцию вручную, но этот подход не масштабировался, а сфера применения Studio Search росла. Нам понадобилось создать инструменты, которые помогли бы нам автоматизировать как можно больше операций по подготовке конвейера к работе. Для того чтобы это сделать, нам нужно было разобраться с четырьмя основными проблемами:

  • Как брать у пользователей все необходимые сведения для настройки конвейера, данные о необходимых им конфигурациях системы.

  • Схема данных для потоков Data Mesh делается с помощью Avro. На предыдущей схеме архитектуры системы, в блоке № 3, имеется поток, переносящий к sink‑коннектору Elasticsearch результаты GraphQL‑запроса. Ответ от GraphQL может содержать десятки полей, часто — вложенных. Ручное создание схемы Avro для подобных документов — это трудоёмкая операция, в ходе которой могут возникать ошибки. Нам нужно было очень сильно всё это упростить.

  • И, подобно вышеописанному, генерирование шаблона Elasticsearch тоже требует больших затрат времени и подвержено ошибкам. Нам надо было понять то, как генерировать эти шаблоны, основываясь на конфигурациях, предоставленных пользователями.

  • И наконец — ручное создание конвейеров Data Mesh — это тоже операция долгая и чреватая ошибками. Это так, кроме прочего, и из‑за необходимости работы с большими объёмами настроек.

Конфигурация

Для сбора у пользователей сведений о конфигурациях конвейера индексирования мы поступили так. Мы создали единый конфигурационный файл, который позволял пользователям давать высокоуровневое описание их конвейера, которое мы могли использовать для программного создания конвейера индексирования в Data Mesh. Используя это высокоуровневое описание, мы смогли сильно упростить для пользователей процесс создания конвейера. Сделали мы это благодаря заполнению обычных, но необходимых параметров конфигурации для конвейера Data Mesh.

https://miro.medium.com/v2/resize:fit:487/1*jw8US5Z96Sspa9vffxjphA.png
Пример конфигурационного .yaml-файла

Генерирование схемы данных Avro и шаблона индекса Elasticsearch

Мы применили очень похожие подходы для генерирования схемы данных Avro и шаблона индекса Elasticsearch. В целом всё сводится к тому, что нужно взять предоставленный пользователем шаблон GraphQL‑запроса и сгенерировать на его основе JSON‑код. Делалось это с помощью библиотеки graphql‑java. Шаги, необходимые для решения этой задачи, перечислены ниже:

  • Делаем интроспективный запрос к схеме федеративного графа и используем ответ для создания объекта GraphQLSchema.

  • Парсим и валидируем, с учётом схемы, шаблон GraphQL-запроса, предоставленный пользователем.

  • Посещаем узлы, используемые в запросе, применяя утилиты из graphql-java, и собираем результаты в JSON-объект. Этот сгенерированный объект и является схемой или шаблоном.

Развёртывание

Предыдущие шаги позволили собрать все сведения о конфигурации в одном файле. Выполняя их, мы смогли воспользоваться инструментами для генерирования дополнительных конфигурационных сведений, относящихся к зависимостям конвейера. После этого нужно было лишь создать точку входа для пользователей, которая позволила бы им предоставлять конфигурационные файлы для организации подготовки конвейера к работе. Учитывая то, что наши пользователи — это другие инженеры — мы решили дать им интерфейс командной строки (Command Line Interface, CLI), написанный на Python. Используя Python, мы смогли быстро предоставить пользователям первую версию CLI. В Netflix имеются инструменты, позволяющие организовать автоматическое обновление CLI. Это позволило сильно упростить доработку интерфейса командной строки. Он решает следующие задачи:

  • Валидирует предоставленный ему конфигурационный файл.

  • Вызывает службу для генерирования схемы Avro и шаблона индекса Elasticsearch.

  • Готовит план конвейера Data Mesh и создаёт его, используя API Data Mesh.

CLI — это всего лишь шаг в сторону улучшения самостоятельного развёртывания решения. Сейчас мы исследуем возможность рассматривать индексы и связанные с ними конвейеры как декларативную инфраструктуру, управляемую приложением, которое в них нуждается.

Текущие задачи

Использование федеративного графа в качестве источника документов упрощает значительную часть процесса индексирования, но при этом ставит перед нами новые сложные задачи. Если вам покажутся интересными те задачи, о которых вы прочтёте ниже — присоединяйтесь!

Заполнение индексов

Развёртывание нового индекса при добавлении или удалении атрибутов, или обновление существующего индекса — это операции, которые создают значительную дополнительную нагрузку на федеративный шлюз и на DGS компонентов. Фактически — речь идёт о пиковых значениях нагрузки. В зависимости от мощности индекса и от сложности его запросов, нам может понадобиться координировать выполнение этих операций с владельцами службы, и/или выполнять заполнение индексов в периоды низкой нагрузки на систему. Мы продолжаем заниматься поиском компромисса между скорость реиндексирования данных и нагрузкой, которую это создаёт на систему.

Обратный поиск

Операции обратного поиска, хоть и дают многие плюсы, не очень‑то удобны для пользователей. Они создают в конвейере циклическую зависимость: нельзя создать индексирующий конвейер, не выполнив обратный поиск, а для работы обратного поиска нужен индекс. Мы, правда, смягчили эту проблему, но она всё ещё вносит в нашу систему определённый беспорядок. Для выполнения этих операций, кроме того, нужно, чтобы тот, кто определяет индекс, обладал бы подробнейшими сведениям о событиях для связанных сущностей, которые нужно включить индекс. Эти знания, в зависимости от индекса, могут относиться к множеству различных предметных областей. Например, у нас есть один индекс, охватывающий восемь таких областей.

Согласованность индекса и базовых документов

По мере того, как индекс становится всё сложнее — растёт вероятность того, что он будет зависим от большего количества DGS, а так же — вероятность ошибок при загрузке необходимых документов из федеративного графа. Эти ошибки могут вести к тому, что документы в индексе окажутся неактуальными, или даже к тому, что нужных документов в индексе попросту не будет. Владельцу индекса, а так же — другим командам, относящимся к определённой предметной области, часто бывает нужно вникать в разбор подобных ошибок, анализировать связанные с ними сущности. Тот, кто этим занимается, оказывается в незавидном положении, не имея возможности решить соответствующие проблемы независимо. Когда ошибки исправляют — повторная выдача событий, давших сбой, выполняется вручную. Возможно появление временной разницы между тем моментом, когда служба снова оказывается способной успешно выдавать данные, и тем моментом, когда индекс будет этим данным соответствовать.

Итоги

В этом материале мы рассказали о том, как наша инфраструктура индексирования перемещает данные любого подграфа федеративного графа Netflix Content в Elasticsearch, и о том, как она поддерживает синхронизацию этих данных с источником достоверной информации.

О, а приходите к нам работать? ? ?

Мы в wunderfund.io занимаемся высокочастотной алготорговлей с 2014 года. Высокочастотная торговля — это непрерывное соревнование лучших программистов и математиков всего мира. Присоединившись к нам, вы станете частью этой увлекательной схватки.

Мы предлагаем интересные и сложные задачи по анализу данных и low latency разработке для увлеченных исследователей и программистов. Гибкий график и никакой бюрократии, решения быстро принимаются и воплощаются в жизнь.

Сейчас мы ищем плюсовиков, питонистов, дата-инженеров и мл-рисерчеров.

Присоединяйтесь к нашей команде

Комментарии (0)