Как с помощью Spark API и библиотек с открытым исходным кодом добиться лучшей наблюдаемости данных в вашем приложении

Из этой статьи вы узнаете, как использовать Listener API и библиотеки качества данных, чтобы улучшить наблюдаемость данных для Apache Spark на разных уровнях.

Spark занимает очень важное место в современном дата стеке. Таким образом, чрезвычайно важно обеспечить хороший уровень наблюдаемости для ваших сред Spark. Существует множество вариантов мониторинга Spark, включая SaaS программы, которые предоставляют вам уже настроенные дашборды под метрики Spark и Spark SQL. Но что, если вам этого недостаточно?

Типичный сетап Spark-приложения, независимо от того, является ли он самостоятельным или управляемым решением, включает в себя несколько операционных дашбордов для мониторинга работоспособности кластера. И хоть эти дашборды очень полезны, они представляют лишь общий обзор инфраструктуры, а не фактические метрики, связанные с данными. Да, на их основе мы можем сделать вывод, что с приложением что-то не так, когда ЦП увеличил нагрузку или кластеру не хватает оперативной памяти, но они не помогут нам в ситуации, когда источник изменил схему или данные, полученные из другого отдела, повреждены. Большинство проблем, с которыми сталкиваются инженеры, вызваны именно данными, а не базовой инфраструктурой, поэтому им приходится тратить много времени на воспроизведение проблем или возиться с файлами и бакетами, совершая поистине детективную работу. Именно здесь мониторинг приложения нас может очень сильно выручить.

Разрешение каждой ситуации требует разного уровня видимости, поэтому  дата-инженеры должны иметь возможность работать на более глубоком уровне, чем метрики работоспособности. В противном случае вы можете тратить значительное количество времени на отладку проблем с качеством данных в Spark.

В этом руководстве вы узнаете, как достичь и высоких и низких уровней наблюдаемости данных для Spark. На высоком уровне вы будете использовать внутренние системы Spark, такие как Listener API и Query Execution Listeners (слушатели выполнения запросов). На низком уровне вы узнаете, как использовать библиотеки для отслеживания показателей качества данных.

Научившись обеим техникам, у вас будет возможность выбрать ты, которая лучше всего подходит для проблемы, которую вы пытаетесь решить.

Низкоуровневые способы мониторинга Apache Spark

Слушатели Spark

Это очень старый и надежный способ получения метрик. Как ни странно Spark UI использует этот же механизм для визуализации метрик. API слушателей Spark позволяет разработчикам отслеживать события, которые Spark генерирует во время выполнения приложения. Этими событиями обычно являются начало/конец приложения, начало/конец джобы, начало/конец этапа и т. д. Полный список можно найти в Spark JavaDoc. Слушатели Spark очень легко настроить и использовать для получения метрик. После выполнения каждой из операций Spark вызовет Spark Listener и передаст в его метод некоторые метаданные. Они будут включать такие вещи, как время выполнения, прочитанные/внесенные записи, прочитанные/записанные байты и прочее.

Этот очень простой и низкоуровневый мониторинг качества данных будет проверять количество и размер записей. Представьте, что у вас есть некоторый джоб, производящая какие-либо преобразования/аналитику входящих наборов данных, которая выполняется ежедневно. Вы можете написать слушатель, который проверяет, сколько записей было прочитано из ввода, и сравнивает его с результатом предыдущего дня. Когда разница значительна, можно предположить, что с источником данных что-то не так.

Однако этот подход предполагает написания собственных решений для мониторинга. Значения метрик должны где-то храниться, нужно настраивать механизмы оповещения. Когда код приложения изменится, все ключи метрик также изменятся, и за этим нужно следить.

Однако даже самый простой Spark Listener может дать некоторое представление о ваших данных.

Вот пример такого слушателя:

public class SomeSparkListener extends SparkListener {

/**
    * Этот достаточно простой слушатель выводит метрики, собранные для каждого этапа
    *
    * @param stageCompleted
    */
   @Override
   public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
       StageInfo stageInfo = stageCompleted.stageInfo();
       Iterator<AccumulatorV2<?, ?>> it = stageInfo.taskMetrics().accumulators().iterator();
       while (it.hasNext()) {
           AccumulatorV2<?, ?> next = it.next();
           String key = next.name().get();
           Object value = next.value();
           System.out.printf("key: %s, value: %s%n", key, value);
       }
   }
}

Вы можете добавить Spark Listener в свое приложение несколькими способами:

Добавьте его программно:

SparkSession spark = SparkSession.builder().getOrCreate();
spark.sparkContext().addSparkListener(new SomeSparkListener());

Или передайте его через параметры драйвера кластера spark-submit/spark:

spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"

Слушатели выполнения запросов Spark

Это еще один готовый механизм мониторинга в Spark. Вместо того, чтобы сосредотачиваться на метриках очень низкого уровня, слушатель выполнения запросов (Query Execution Listener) позволяет разработчикам подписываться на события завершения запроса. Он предоставляет метаданные более высокого уровня о выполненном запросе, такие как логические и физические планы, а также показатели выполнения.

Вы можете получить метрики, такие как прочитанные/внесенные записи по запросу, но на этот раз агрегированные для всего запроса, а не для конкретных задач/джобов/этапов.

Также можно извлечь очень полезную информацию из таких планов, ​​как расположение данных и схема. Вы можете извлекать и сохранять схему вместе с измерениями датафрейма и сравнивать ее с предыдущими запусками, генерируя оповещения, когда что-то идет не так.

Однако извлечение данных из плана может быть затруднительно, поскольку вы вынуждены использовать низкоуровневый API Spark.

Кроме того, все операционные нагрузки, связанные с реализацией механизмов хранения метрик и оповещением, все еще присутствуют. То, что вы получите от Spark, — это просто метаданные. Ответственность за их обработку лежит на разработчике.

Вот пример простого слушателя выполнения запросов, который выводит план и метрики:

public class ExampleQueryExecutionListener implements QueryExecutionListener {

   /**
    * Выводим план и метрики запроса
    *
    * @param funcName
    * @param qe
    * @param durationNs
    */
   @Override
   public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
       System.out.println(qe.executedPlan().prettyJson());
       Iterator<Tuple2<String, SQLMetric>> it = qe.executedPlan().metrics().iterator();
       while (it.hasNext()) {
           Tuple2<String, SQLMetric> next = it.next();
           System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
       }
   }

   @Override
   public void onFailure(String funcName, QueryExecution qe, Exception exception) {

   }
}

Слушатели выполнения запроса можно добавить либо программно, либо в конфигурации:

В коде приложения:

In application code:

SparkSession spark = SparkSession.builder().getOrCreate();
spark.listenerManager().register(new ExampleQueryExecutionListener());

С помощью spark-submit:

spark-submit --conf "spark.extraListeners=ai.databand.ExampleQueryExecutionListener"

Реализация низкоуровневого мониторинга может оказаться довольно сложной задачей, однако такой “системный” способ мониторинга имеет огромное преимущество: он не требует дополнительных вычислительных ресурсов. Поскольку метаданные создаются и записываются внутренними компонентами Spark, это не приводит к снижению времени выполнения запроса.

Использование слушателей для мониторинга позволяет нам вообще не трогать код приложения! Это может иметь огромные преимущества, когда вы хотите отслеживать данные в актуальных и старых версиях приложения, но не располагаете бюджетом для внесения изменений. Просто напишите слушатель, передайте его через конфигурацию spark и получите представление о своих данных.

Высокоуровневые способы мониторинга Apache Spark

Ручная проверка качества данных

Вы можете значительно повысить свою уверенность в поступающих данных, проверяя их вручную. Допустим, мы ожидаем некоторое количество записей во входном источнике данных, и это число обычно не должно быть меньше X. Мы можем написать что-то наподобии:

df = spark.read("path")
if (df.count < X) {
  throw new RuntimeException("Input data is missing")
}

И наши возможности здесь практически безграничны. Мы можем сравнивать подсчеты, считать ненулевых значения, выведенные схемы и т. д.

Использование библиотек качества данных

Поскольку многие проверки качества более или менее тривиальны, например, обеспечение правильной формы и содержимого вашего датафрейма, сообщество уже разработало удобные библиотеки для таких проверок. Одной из таких библиотек является Deequ. В большинстве случаев она предоставляет богатый предметно-ориентированный язык (DSL). Обязательно ознакомьтесь с ней. Также в ней есть некоторые продвинутые функции, такие как возможность профилирования столбцов — вычисление минимума/максимума/среднего/процентиля, рассчет гистограмм, обнаружение аномалий и многое другое.

Рассмотрим следующий пример из документации Deequ:

val verificationResult = VerificationSuite()
  .onData(data)
  .addCheck(
    Check(CheckLevel.Error, "unit testing my data")
      .hasSize(_ == 5) // мы ожидаем 5 строк
      .isComplete("id") // не должно быть NULL
      .isUnique("id") // не должно содержать дубликатов
      .isComplete("productName") // не должно быть NULL
      // должно содержать только значения "high" и "low"
      .isContainedIn("priority", Array("high", "low"))
      .isNonNegative("numViews") // не должно содержать отрицательных значений
      // хотя бы половина описаний должна содержать URL
      .containsURL("description", _ >= 0.5)
      // половина элементов должна иметь менее 10 просмотров
      .hasApproxQuantile("numViews", 0.5, _ <= 10))
    .run()

Как вы могли заметить, у нас есть огромный набор проверок, завернутый в приятный и функциональный DSL.

Что еще более важно, Deequ предоставляет возможность сохранять результаты проверок и автоматически проводить сравнения с предыдущими запусками. Это можно сделать с помощью репозиториев метрик. Можно написать собственную реализацию и легко интегрировать Deequ в существующую инфраструктуру мониторинга.

Хотя высокоуровневые проверки качества приложений намного более гибкие, чем низкоуровневые подходы, у них есть большой недостаток: снижение производительности. Поскольку при каждом вычислении выполняется spark операция, в некоторых случаях накладные расходы могут быть значительными, особенно для больших наборов данных. Каждое “count” и “where” может вызывать полное сканирование. Spark сделает внутри все возможное для оптимизации планов выполнения, но вы должны учитывать эти последствия и убедиться, что профилирование данных не повлияет на вашу производительность.

Заключение

Мы рассмотрели несколько способов мониторинга качества данных для приложений Spark. Низкоуровневый подход использует Spark Event Listeners API и предоставляет доступ к низкоуровневым метрикам, таким как прочитанные/внесенные записи, логические/физические планы, и может быть полезен для определения тенденций и обеспечения правильных результатов конвейера данных, давая нам обзор существующих приложений без каких-либо дополнительных модификации кода. Подходы высокого уровня, такие как проверка данных вручную или использование библиотек качества данных, намного удобнее, но имеют недостатки, такие как снижение производительности.

Как и в любой реальной ситуации, всегда есть компромиссы и лучшие сценарии для обоих подходов, в зависимости от типа вашего приложения. Используйте их с умом.

В Databand мы используем оба подхода, чтобы получить полный набор метрик для мониторинга приложений Spark. В то время как в нашем ядре мы используем слушатели Spark для построения тенденций метрик и происхождения данных, мы также реализовали удобное хранилище метрик для Deequ, а также возможность отслеживать отдельные метрики, рассчитанные вручную.


На сегодняшний день инструменты Оркестрации — это отраслевой стандарт для организации получения, обработки и сохранения данных из сотен и даже тысяч разнородных источников с разнообразной частотой обновления и природой.

Сценарии использования платформ Оркестрации разнообразны: вы можете просто организовать регулярные select -> group by-> insert из production базы данных в "холодную" реплику под аналитику, а можете написать целый сервис, который будет раз в час обновлять данные, дообучать ML-модель и поставлять актуальные прогнозные значения конечным пользователям.

Данная технология — неотъемлемый инструмент в арсенале современного Data Engineer и администратора вычислительного кластера.

Приглашаем всех желающих на открытый урок, на котором мы подробно разберем, что же такое платформы Оркестрации, какие решения есть сегодня на рынке и даже углубимся в практический пример использования одной из самых распространенных платформ на сегодня: Apache Airflow.

Комментарии (1)


  1. VovaQbinskiy
    01.03.2022 19:10

    Спасибо за статью