Контекст и мотивация

Качество данных — это не просто вопрос наличия значений в столбцах таблиц. Это вопрос доверия к данным в целом. Мы можем создавать сложные системы отчётности, но если на каком-то этапе ETL в данных возникают пропуски, дубликаты или они не соответствуют ожиданиям, вся система теряет доверие потребителей. В результате приходится тратить много времени на поиск и устранение причин таких проблем.

Data Quality в Ozon Data Platform выходит за рамки простой технической проверки значений в таблицах. Это сложный процесс, в котором задействованы как люди, так и технологии.

Наши пользователи — аналитики и ETL-разработчики, которые следят за качеством аналитических витрин. Технологии Data Governance позволяют им эффективно управлять своими данными, а инструмент Data Quality, в частности, даёт возможность создавать проверки качества данных для объектов в хранилищах информации.

Регулярный запуск этих проверок обеспечивает своевременное оповещение пользователей о возможных проблемах в бизнес-процессах.

В течение нескольких лет процесс обеспечения качества данных разрабатывался только для детального уровня в общем хранилище данных (DWH). Аналитики DWH отвечали за создание и контроль проверок, а разработчики — за соответствие данных ожиданиям в этом слое.

Однако со временем возникла потребность масштабировать решения Data Governance на другие хранилища информации. Одним из таких источников стало хранилище на базе Hadoop Distributed File System (далее HDFS), данные в котором хранят многие команды в Ozon. Количество таблиц у первичного пользователя насчитывает порядка 7-9 тысяч, их суммарный нереплицированный размер — около 200 терабайт, а команд, которые используют YARN-очереди для подключения к HDFS через Spark и производят данные, около 80.

Мы поставили перед собой задачу предоставить этим пользователям возможность заводить проверки качества данных на своих таблицах в HDFS. Именно об этом пойдёт речь в статье.

Общая архитектура

Текущая реализация Data Quality включает в себя бэкэнд-сервис, написанный на Go. Этот сервис хранит SQL-шаблоны проверок и список объектов, которые можно проверять. Кроме того, он предоставляет возможность планировать и запускать проверки качества данных в хранилище по заданному расписанию или в ответ на определённые события. Frontend-сервис предоставляет пользователю единую точку входа, через которую он может создавать проверки, запускать их и просматривать результаты.

В первичной реализации сервис имел возможность подключаться и запускать проверки качества данных только в определённой SQL column-oriented-базе данных. Для реализации чтения файлов из HDFS, а именно parquet-файлов, нам потребовалось рассмотреть ряд решений, которые позволяли бы подключаться к распределённому хранилищу и получать доступ к данным.

Были рассмотрены следующие подходы.

Подход №1: прямое чтение через HDFS Native Client и парсинг паркетов в Go-структуры.

В реализации такого подхода минусы видны сразу, отсортируем их по возрастанию критичности:

  • требуются CGO- и Hadoop-зависимости;

  • невозможность фильтрации внутри таблиц;

  • колоссальное потребление памяти Go-приложения.

Из плюсов можно отметить разве что минимальную задержку для получения малых файлов прямиком в память, однако ощутить это преимущество не удастся, так как у нас таблицы чаще занимают десятки или даже сотни Gb.

Подход №2: использование промежуточного SQL-движка.

Наиболее подходящей функциональностью обладает Trino.

Trino теоретически может рассматриваться как способ выполнять SQL-запросы непосредственно по данным, хранящимся в HDFS. Это распределённый SQL-движок, который поддерживает широкий набор коннекторов и умеет объединять данные из разных источников. Однако в нашем случае использование Trino не выглядит оправданным, потому что Trino не работает напрямую с «сырыми» файлами в HDFS. Ему необходим слой метаданных, в котором таблицы должны быть заранее зарегистрированы со схемами и статистикой. Это означает необходимость предварительной подготовки данных и ограничивает гибкость при работе с непостоянными наборами. В случае отсутствия актуальных статистик оптимизатор Trino может строить неоптимальные планы выполнения и деградировать по производительности.

Подход №3: промежуточный сервис, который смог бы принимать gRPC-запросы и конвертировать их в спаковые операции.

Ожидается, что функционал этого сервиса позволит выступить в роли промежуточного клиента между Go и Spark. Согласен, звучит на грани фантастики, однако, у Apache появилось такое решение — Spark Connect Server, ознакомиться с реализацией и описанием можно по ссылке.

Для нашей задачи данное решение подходит наилучшим образом. Spark Connect Server реализует стабильный gRPC-интерфейс, полностью изолирующий исполнение клиента от среды кластера HDFS. Клиентское приложение не встраивает Spark-драйвер, не приходится тянуть JVM, Spark-библиотеки и зависимости Hadoop. Это не просто удобный, а единственно корректный способ интеграции внешнего клиента на Go со Spark.

Из минусов стоит отметить, что клиентский функционал (библиотека spark-connect-go), хотя и создан Apache, но помечен как экспериментальный. Тем не менее наши тесты показали, что библиотека предоставляет достаточно возможностей для решения наших задач.

Кроме того, важно отметить, что промежуточный сервис представляет собой дополнительный элемент в архитектуре системы, который может стать потенциальным местом отказа. Однако если корректно его настроить, уделив внимание стабильности, Spark Connect Server показал себя достаточно надёжным.

Итак, вот как выглядит архитектурное решение: сервис DQ, написанный на языке Go, запускает проверки для HDFS, вызывая RPC в промежуточном сервисе, который поддерживает подключение Spark Connect Server к HDFS. Задача разработчика — реализовать сервис для управления Spark Connect Server и создать клиент для межсервисного взаимодействия между DQ и Spark Connect Server.

Spark Connect Server вынесен в отдельный сервис, чтобы изолировать вычислительное окружение от основного оркестрационного, а также для обеспечения независимого масштабирования, обновления и контроля стабильности.

Этап разработки

Пропустим этап декомпозиции и согласования решений и перейдём к этапам реализации.

Начальный этап — разработка микросервиса для Spark Connect Server.

Определим несколько базовых требований к сервису:

  • сервис должен быть способен запускать N экземпляров Spark Connect серверов в N экземплярах самого сервиса, обеспечивая равные условия для каждого экземпляра;

  • взаимодействие с HDFS происходит через YARN, аллоцируя определённое количество ресурсов в очереди;

  • ожидание ответа от сервиса о результатах проверок должно укладываться в некоторый SLI, определим его позже.

Начнём с того, что мультиподовая архитектура автоматически масштабирует количество серверов до N, а внутренняя балансировка запросов, реализованная в K8s, распределяет их по принципу round-robin.

Эта особенность архитектуры делает невозможным использование API Spark Dataframe без дополнительных изменений. Дело в том, что каждый под запускает свой собственный экземпляр приложения Spark Connect Server, и между ними не происходит обмена состояниями.

Например, если я отправлю запрос на создание временной таблицы, а затем попытаюсь обратиться к ней в следующем запросе, балансировщик может отправить меня в любой другой под из N-1, где моя временная таблица ещё не создана, что приведёт к ошибке. Такая же проблема возникает и при кешировании результатов чтения parquet-файлов. К счастью, для проверки качества данных подойдёт простой Spark SQL-запрос, результат которого можно получить одним запросом к серверу.

Ко всему прочему, мы научились управлять маршрутизацией трафика и делаем походы из DQ-сервиса в Spark Connect Server внутри дата-центра, что позволяет нам снизить потери времени на общение по сети. Также это позволило не угадывать, в какой под придёт запрос от DQ, так как мы развернули строго по одному инстансу Spark Connect на дата-центр. Последняя фича позволила бы нам управляться со stateful-запросами без обмена состояниями между инстансами Spark Connect, но мы остановились на простых SQL-запросах вместо API Spark Dataframe.

Хотелось бы выразить благодарность нашим коллегам за их усилия по точной маршрутизации трафика. Они успешно реализовали алгоритм, описанный в статье, который мы использовали в своей работе.

Важно уделить внимание конфигурации Spark Connect Server с точки зрения необходимых ресурсов. Наша цель — разработать систему, которая сможет эффективно справляться с большим объемом работы. Мы ожидаем, что количество ежедневных проверок будет значительным, а их график может быть довольно плотным.

Следует учитывать, что ресурсы, такие как ядра VCores и объём их памяти, небезграничны. Нам необходимо рассчитать их точное количество, чтобы обеспечить бесперебойное выполнение проверок качества данных.

Задача заключается в том, чтобы определить, сколько гарантированных ресурсов необходимо выделить для размещения N Spark Connect Servers, чтобы обеспечить быстрое и эффективное выполнение проверок. Предполагается, что проверка должна быть выполнена в течение пяти минут (SLA = 5 минут). Эффективность же означает планирование задач таким образом, чтобы ресурсы максимально утилизировались, а результаты проверок были получены в кратчайшие сроки.

Конфигурация Spark Connect Server должна помочь справиться с этой задачей. Определим, на какие параметры мы можем влиять:

R_{total} — общее количество гарантированных ресурсов в YARN, пусть эта метрика измеряется в VCores;

R_{task} — максимальное потребление ресурса для одной задачи;

N_{max concurrent}=\frac {R_{total}} {R_{task}} — максимально возможное количество одновременных задач;

λ — средняя интенсивность поступления задач (\frac {task} {sec});

μ — средняя скорость выполнения одной задачи (sec).

Тогда ρ =  \frac {λ * μ} {k} – пропускная способность системы, где k — число «слотов».

Если ρ >=1, система считается перегруженной, что приводит к зависанию задач в очереди — критерий эффективности не выполняется. Так как максимальное количество ресурсов у нас зафиксировано, а величину среднего потребления ресурса для одной задачи требуется измерить, влиять мы можем только на интенсивность поступления задач.

Исходные данные: таблица размером 100 ГБ с проверкой на уникальность высокочастотного поля. В примере на скрине проверка некоторой таблицы в HDFS, которая физически находится в директории /warehouse/table в формате parquet. Данный тип проверки пользуется наибольшей популярностью у пользователей.

Цель — определить soft cap по CPU, то есть найти такое приращение dCPU, после которого время выполнения запроса меняется незначительно.

Тесты проводятся в диапазоне от 20 до 160 CPU с шагом 10 CPU, поскольку мы можем выделить не более 160 ядер на один сервер Spark Connect.

Soft-cap наблюдается уже при 50 CPU, так как при дальнейшем увеличении CPU на проверку изменение приращения скорости выполнения изменяется линейно. В свою очередь, до 50 CPU изменение приращения весомое. Действительно, при увеличении ресурсов до бесконечности мы упрёмся в hadr-cap, при котором каждый шаг в 10 CPU будет давать приращение в скорости близкое к 0. Данным тестом нам удалось доказать, что на одну проверку нам нужно как минимум 40 ядер CPU для соблюдения SLA.

Таким образом, мы можем рассчитывать на то, что одна проверка, запущенная на одном Spark Connect, выполнится за 1 минуту 30 секунд. Две проверки при равном (FAIR scheduler) распределении ресурсов между задачами — за 2 минуты 48 секунд. Три проверки — за 3 минуты 36 секунд, четыре — за 3 минуты 48 секунд. По результатам исследования более 4 проверок запускать бессмысленно, так как ресурсов банально не хватит для соответствия SLA в 5 минут на одну проверку.

Финальный вывод: для запуска worst-case-проверки на одном Spark Connect server не должно быть запущено больше четырёх проверок. Исходя из эксперимента, можно сделать вывод о средней интенсивности поступления задач — она приблизительно равна λ≃  \frac {4} {226}≃1 \frac {task} {min}

Стоит подчеркнуть, что YARN-очередь для запуска проверок ограничена и едина для всех инстансов Spark Connect. By design-клиент Data Quality запускает проверки в режиме leader election и, согласно нашей архитектуре, на двух разных инстансах Spark Connect не может быть одновременно запущены проверки, однако лидер может переключиться. Для того чтобы обеспечить «мастеру» Spark Connect наибольшее возможное количество ресурсов к моменту, когда потребуется запускать проверки, аллокация ресурсов происходит динамически. Изначально инстансы запущены на минимальных ресурсах, способных поддержать жизнеспособность Spark Connect.

Результаты

Мы создали систему, способную запускать проверки из нашего сервиса Go в хранилище HDFS и получать ответы в прогнозируемые сроки.

Тезисно перечислю трудности, с которыми мы столкнулись в период разработки решения и пути их решения.

Стабильность запусков:

  • количество параллельных задач, выполняемых в рамках одного приложения, контролируется на клиенте;

  • настроена дефолтная политика retry для проверок, которые по какой-то причине провалились после первого запуска.

Стабильность сервиса:

  • количество логов снижалось с помощью конфига log4j, переданного напрямую в extraJavaOptions;

  • троттлинг CPU на pod Kubernetes при запуске запросов удалось снизить, зафиксировав опцию ActiveProcessorCount на драйвере, указав его в spark.driver.extraJavaOptions, равным количеству requests для pod;

  • в силу того, что у нас нет возможности использовать Spark Connect версии выше 3.5.5, не удалось воспользоваться функционалом рестарта сервера при внутренних ошибках. Для решения данной проблемы был написан парсер stdout логов на bash, который отслеживает остановку SparkContext.

Безопасность:

  • для доступа к командным данным владельцы директорий в HDFS могут самостоятельно выдать доступы сервисной учётной записи в приватные директории.

Однако, несмотря на наши достижения, всё ещё существуют некоторые проблемы:

  • мы опасаемся, что в будущем количество проверок может увеличиться, и задержка их выполнения станет критической для нашего сервиса;

  • некоторые таблицы имеют не самый удачный дизайн. Например, они могут быть партиционированы только по размеру (по умолчанию HDFS делит данные на физически равные части фиксированного размера, чтобы равномерно распределять нагрузку по узлам кластера) или содержать большое количество высоко кардинальных полей, что может приводить к неравномерному распределению данных.

Мы запустили эту функциональность в продакшн и получили обратную связь от пользователей по их сценариям использования, а также сформировали небольшой бэклог доработок, который позволит избежать перегрузок системы. Для теста пользователи выбрали пачку таблиц суммарным объёмом примерно в 2,6 ТБ и запускают проверки на них по расписанию. Метрики скорости выполнения задач показывают, что мы укладываемся в SLA.

Основным показателем качества работы наших проверок является их скорость. Мы определяем её как величину, обратную времени, которое проходит с момента запуска проверки до получения результата.

В ближайшее время мы планируем разработать инструмент для проверки таблиц и сбора статистики по ним. Этот инструмент сможет заранее предупреждать пользователей о том, что конфигурация будет создавать чрезмерную нагрузку на систему, и предлагать альтернативные варианты конфигурации, что позволит им более осознанно подходить к настройке своего окружения. Но это уже тема для другой статьи.

Комментарии (1)


  1. oneti
    01.11.2025 15:00

    Что-то вы забыли упомянуть, что на уровне hdfs/yarn Spark Connect виден как одна учетка.

    Не очень понятно как вы выстраиваете свою архитектуру...