Привет, Хабр! Меня зовут Максим Шаланкин, я Data Science Team Lead в команде Финтеха Big Data МТС. Из этой статьи вы узнаете, как оценивать качество данных при помощи алгоритма adversarial validation с использованием spark user defined function. В частности, я расскажу, почему такой подход хорошо масштабируется и может быть успешно применен к различным типам данных, становясь универсальным инструментом для встраивания в ваши продуктовые процессы. Все подробности — под катом. 

Обучающая выборка для ML-модели дрейфует в океане неизвестных данных
Обучающая выборка для ML-модели дрейфует в океане неизвестных данных

В повседневных задачах нам важно понимать, что для ML-моделей нужны самые качественные данные. Поэтому мы применяем эффективную Data Quality проверку на основе алгоритма adversarial validation и spark user defined function. 

Статья достаточно большая, но структура не очень сложная:

  1. Из каких подходов DQ мы выбирали.

  2. Сравнение работы этих подходов.

  3. Наш выбор в скорости и качестве — spark udf реализация алгоритма adversarial validation.

  4. Как мы внедрили выбранный подход у себя.

Настоящая сила рождается в голове данных. Она заставляет идти делать DQ, даже когда тело модель хочет упасть. Беар Гриллс.

Из каких подходов DQ мы выбирали

Оценки Data Quality бывают разные, но цель у них одна — понять, что с данными всё в порядке. А если есть какая-то проблема, понять её причину и масштаб. Для этого существует много различных подходов — от самых простых до очень сложных. Каждый выбирает себе тот, который покрывает потребности в решаемых задачах. 

Мы в команде финтеха МТС Digital обрабатываем огромные объёмы данных, поэтому важно, чтобы эти данные были надлежащего качества. Цена возможной ошибки растет пропорционально росту объёма данных. Тут и время, которое тратят люди и сервера на обработку данных, и сложности с поиском ошибок из-за объёмов и разной структуры в данных. Проблем становится ещё больше, когда речь заходит о построении ML-моделей. По известному всем правилу «Garbage in, garbage out» качество входных данных определяет качество итоговой модели. Поэтому мы используем большинство известных подходов к оценке качества.

Довольно частая задача — понять, насколько сильно данные за текущую дату (business_dt = t) отличаются от данных за предыдущую дату (business_dt = t-n). Будем смотреть на изменение данных относительно эталонной даты при помощи следующих алгоритмов (на примере numeric данных):

  • Population stability index (psi)

  • Kolmogorov-smirnov statistic (kstest)

  • Adversarial Validation

Это далеко не исчерпывающий список возможных проверок данных, но он частично отражает то, чем пользуемся мы в своей ежедневной работе.

Вот небольшие вводные по каждому из этих способов оценки данных: 

Метрика Population Stability Index (PSI) предназначена для измерения изменений в распределении переменной между двумя выборками в разные периоды времени.

Эта расчётная величина показывает различия в исходных данных, которые разделили на бины вручную (над непрерывными данными), или же они уже были поделены таким образом заранее (категориальные данные).

Критерий Колмогорова-Смирнова — статистический метод служит для определения, принадлежит ли выборка известному закону распределения. Путём сравнения кумулятивных функций распределения и анализа максимального вертикального расхождения можно оценить соответствие выборки теоретической модели. Критерий чувствителен к отклонениям как в центре, так и на краях распределения, обеспечивая более всестороннюю проверку соответствия выборки закону распределения.

Из этого критерия нам интересна величина статистики — точка, в которой сумма накопленных расхождений между двумя распределениями является наибольшей. 

Мы будем сравнивать два непрерывных распределения данных и оценивать их изменение. 

Алгоритм adversarial validation — это подход в машинном обучении, который позволяет оценить, насколько две выборки данных похожи друг на друга с использованием ML-модели. Если модель успешно различает две выборки, это указывает на потенциальные различия между ними.

Adversarial Validation — самый интересный подход, когда дело касается оценки изменений в большом объёме данных. В качестве алгоритма adversarial classifier удобнее всего брать градиентный бустинг. Алгоритмы градиентного бустинга очень хорошо параллелятся. Бустинг лучше других алгоритмов работает с необработанными данными — мы не тратим время на предобработку (или делаем её по минимуму), а подаём данные в исходном виде (в том, в котором они были записаны в базу данных). 

Кстати, мы ранее уже рассказывали про этот алгоритм, почитать статью можно тут.

Сравнение работы подходов

На этих трёх критериях мы проведём оценку ситуаций, которые могут возникнуть с вашей обучающей выборкой для ML-модели в реальной жизни. Мы рассмотрим такие кейсы, как: 

  • изменение распределения данных

  • перекосы в данных

  • появление равномерных пропусков в данных

  • появление экстремальных пропусков в данных

После этого посмотрим на проблему в комплексной ситуации, когда меняется более, чем один признак. Мы хотим сравнить эти оценки между собой при однотипном изменении данных. Нам нужно разобраться в том, как каждый из этих критериев меняется при изменении данных.

На графиках ниже симулируем поэтапное изменение данных — каждый новый день данные немного отличаются от начального дня наблюдений. Для каждого нового дня мы считаем наши оценки и визуализируем их. Метрика roc auc оценивает нашу adversarial validation модель, kstest показывает величину statistic, а psi — само значение psi.

Проверяем распространённую проблему — изменение вида распределения данных.
Проверяем распространённую проблему — изменение вида распределения данных.

По изменениям графика выше мы можем сделать следующие выводы: все алгоритмы справляются с поставленной задачей — каждая из трёх метрик растёт вместе с счётчиком числа дней по мере того, как исходное распределение становится всё менее похожим на актуальное. Значит, это посильная задача для трёх наших алгоритмов — поймать момент, когда нужно бить тревогу и говорить, что произошел data shift. 

Проверим наши алгоритмы на другой задаче — данные меняются менее предсказуемым образом. Посмотрим на более нетипичное изменение исходных данных, которое тем не менее вполне вероятно может произойти в обычной жизни. Цель всё та же — сравнить работу алгоритмов между собой и понять, как каждый из них реагирует на изменения разного вида.

Здесь мы видим схожие паттерны роста метрик, но есть и отличия. Roc auc и psi примерно одновременно пробивают свои «тревожные границы», но psi в определенный момент (примерно на 28 день) возвращается к зоне «нормальности», когда мы можем говорить, что в данных отличий нет. Алгоритм adversarial validation так себя не ведёт.

Проверим срабатывание наших проверок при равномерном появлении пропусков в данных.

Видно, что каждый критерий, как и в первом случае, меняется с ростом числа дней, но psi пробивает границу слишком поздно, нежели adversarial validation. По темпам роста можно сказать, что roc auc растёт быстрее остальных метрик, у kstest рост более линейный, а psi явно запаздывает и достигает критической отметки только к концу наблюдаемого периода. 

Следующая потенциальная проблема — пропуски в данных будут появляются с конца распределения.

В этом кейсе psi показал себя лучше, так как adversarial validation преодолел черту сильно позже. 

При разборе этих симуляций я почти не затронул график метрики kstest. Причина кроется в отсутствии какого-либо общепринятого уровня, после достижения которого нам следует бить тревогу. Он может сильно меняться с изменением данных — и его нужно каждый раз интерпретировать и пересматривать. Тем не менее, как мы уже увидели, kstest хорошо коррелирует с другими метриками, поэтому мы можем рассматривать его, как альтернативу. 

В реальной жизни данные редко меняются по одному признаку за раз. Они могут меняться одновременно и динамически. Как правило, мы наблюдаем сложную цепочку изменений, где могут быть неочевидные зависимости. Поэтому и DQ к данным нужно применять комплексно — с целью отследить процесс изменения исходного набора данных.

Проведём очередную симуляцию, но на этот раз будем менять шесть столбцов данных за раз. И kstest, и psi умеют работать одновременно только с одним столбцом, поэтому будем примять их последовательно, а результаты суммировать. С adversarial validation таких проблем нет, поэтому за одну итерацию работы алгоритма мы сможем получить значения roc auc, а также feature importance. По feature importance будем судить о том, с каким столбцом данных в нашей базе произошли проблемы.

В левой части графиков мы смотрим, как изменилось исходное распределение и течением времени для 6 столбцов данных. По центру мы визуализируем «силу изменения» столбцов — через значения оценки. Правая часть графика показывает общую оценку изменения в данных. 

Видим, что в момент, как только второй столбец по psi пересекает границу «нормальности», значение roc auc по алгоритму adversarial validation уже достигает ~1.0, что показывает более сильную чувствительность данного подхода относительно других. Распределение kstest по столбцам данных очень сильно коррелирует с распределением psi, при этом накопленный итог растет быстрее. 

Что мы увидим, если будет меняться только один столбец данных из шести? Как поведут себя метрики?

Изменение только одного признака из всего набора данных очевидным образом не повлияло на алгоритмы kstest и psi. Метрика roc auc алгоритма adversarial validation в этот раз достигла своего предела немного позже, нежели на предыдущей симуляции. В feature importance у этого алгоритма так же появился некоторый шум по всем признакам, кроме первого. Но, как и в самой первой симуляции, результаты разных алгоритмов получились схожими.

Так почему же мы выбрали adversarial validation? Он обладает рядом неоспоримых преимуществ:

  • лёгкая интерпретируемость — как мы увидели, в алгоритме нужно следить за общей метрикой roc auc и дополнительно отслеживать feature importance для каждого столбца данных. Это стандартные показатели, если вы работали с ML моделями, поэтому их понимание и интерпретация не является сложной задачей

  • возможность обработки большого объема данных за одну итерацию работы алгоритма — под капотом у adversarial validation применяется некоторый adversarial classifier, обычно применяют эффективные реализации градиентных бустингов (catboost, lightgbm, xgboost). Нам достаточно один раз обучить этот классификатор на нашем объёме данных, чтобы получить результат. К тому же вышеупомянутые алгоритмы подлежат распараллеливанию, что даёт прирост в скорости нашей DQ проверки

  • отсутствие проблем с применением алгоритма к данным разного типа — можно без проблем обучать gradient boosting classifier без сложной подготовки данных (или вообще без неё)

Выбранный алгоритм может быть невероятно полезным, но как его применить в процессах, построенных вокруг больших продуктовых hadoop кластеров?

Наш выбор в скорости и качестве — spark udf реализация алгоритма adversarial validation

Нам важно, чтобы процессы были легко масштабируемыми, ведь у нас big data – кластера hadoop — множество регламентных процессов на airflow. Важно, чтобы можно было легко встроить оценку в существующий процесс и не ждать её завершения целую вечность. Это также может быть проблемой, когда у вас много зависимых друг от друга процессов. Поэтому алгоритм adversarial validation был адаптирован под работу через user defined function.

Spark UDF (User Defined Function) — функция, созданная пользователем для обработки данных в Apache Spark. Она позволяет применять пользовательский код к столбцам DataFrame для реализации нестандартных операций обработки данных.

Примерная структура кода представлена ниже. Примерная она просто потому, что может отличаться в реализации в зависимости от ваших особенностей кластера и версий библиотек.

import pandas as pd
from catboost import CatBoostClassifier
from sklearn.metrics import roc_auc_score
from pyspark.sql.types import StructType, StructField, FloatType


def custom_udf(dataframe, **kwargs):
    model = CatBoostClassifier(random_state=15, verbose=0, iterations=20)
    model = model.fit(dataframe[kwargs['features']], dataframe[kwargs['target']])
    
    preds = model.predict_proba(dataframe[kwargs['features']])[:, 1]
    score = roc_auc_score(y_true=dataframe[kwargs['target']], y_pred=preds)
    
    return pd.DataFrame({"score": score}, index=[0])


schema_df_scores = StructType([
    StructField("score", FloatType(), True),
])

result = spark_dataframe.groupBy('batch').applyInPandas(custom_udf, schema_df_scores)

Мы задаём код, который будет отрабатывать в рамках одного батча данных. В этой реализации важно понимать структуру и сам алгоритм. В качестве adversarial classifier выбран CatBoostClassifier. В нём можно использовать разные гиперпараметры в зависимости от ваших задач и целей. Рекомендуем выбрать небольшое значение для iterations, например число 20. Это имеет смысл по ряду причин: 

  • с небольшим количеством iterations мы уменьшаем вероятность переобучиться под данные

  • делая меньше число iterations, мы уменьшаем время работы алгоритма. Помним, что он должен применяться к большому объёму данных

  • наш алгоритм изучает более простые зависимости в данных, что будет плюсом. Наша задача — сделать понятную Data Quality проверку, а не получить наивысшую метрику модели. Поэтому уменьшение числа ведёт к интерпретируемости

Внутри функции custom_udf описывается основная логика алгоритма adversarial validation. Мы должны заранее создать столбец целевой переменной, а также столбец батча — по нему в будущем будет применяться udf-функция. 

Применение алгоритма к данным в spark происходит обычным для spark-udf-функций образом — по столбцу батча. Мы должны задать ожидаемый формат выходных данных и применить функцию к заранее созданному в наборе данных столбцу batch.

Что вернёт эта spark-udf-функция? Dataframe из одного столбца: score. Особенностью будет число строк в этом dataframe. Оно равно числу уникальных значений batch в вашем исходном наборе данных. Это значит, что мы проведём дополнительно некоторую агрегацию результатов, потому что число батчей будет равно минимум двум. А средний случай — будет выбрано столько батчей, сколько необходимо для корректного размещения dataframe в памяти на spark ноде при работе алгоритма.

Как мы внедрили выбранный подход у себя

У нас есть множество процессов для создания витрин данных, на которых обучаются ML-модели. Все они крутятся на airflow и пишут таблицы в spark. Поэтому применение написанной udf очень простое: нужно сделать проверку adversarial validation отдельным шагом DAG, чтобы он сравнивал только что записанные данные с некоторой эталонной партицией.

Результат работы проверки мониторим на вот таких парных heatmap графиках: по графику с метрикой roc auc определяем, есть ли проблема в новом наборе данных. А по графику feature importance определяем столбец данных, с которым произошла проблема. В итоге время на поиск проблем снижается, а данные становится проще мониторить.

Здесь как раз представлен пример оценки одного процесса, когда adversarial validation показал свою эффективность — в один из дней у нас сломалась категориальная фича в таблице — её распределение сильно изменилось, появилось много новых значений, которых не было ранее. 

С таким подходом не нужно придумывать/прописывать индивидуальные проверки, достаточно применить алгоритм к новому источнику данных «из коробки».

По общему значению roc auc, близкому к 1, мы понимаем, что есть сильные изменения в данных, а по feature importance мы понимаем, что привело к такому высокому скору.

Что в итоге?

Этот алгоритм — один из наших подходов к построению Data Quality оценки. Но, конечно же, он не единственный, среди многочисленных DQ проверок, которые стоит применять для мониторинга важных данных. Все тесты и проверки разные — важно, чтобы они покрывали ваши потребности в анализе качества данных. Нам важно, чтобы проверка отрабатывала быстро, а её результаты были интерпретируемы. 

Комментарии (2)


  1. zeaa
    28.05.2024 11:32

    Catboost и xgboost, кажется, имеют спарковские реализации. Почему используете UDF?


    1. MaximML Автор
      28.05.2024 11:32

      Привет, хороший вопрос. Применяя udf в spark, мы можем извлечь любую информацию, дополняя ту, которая уже была упомянута. В контексте той же самой udf мы дополнительно извлекаем: размер батча, вес батча в мегабайтах, доля целевой переменной, некоторые статистики по категориальным переменным. Все это - не ключевая, но тоже важная информация для мониторинга и анализа. Ее быстрее и проще достать за один прогон spark udf функции, а не каждую по отдельности