Привет, Хабр! Меня зовут Максим Шаланкин, я Data Science Team Lead в команде Финтеха Big Data МТС. Из этой статьи вы узнаете, как оценивать качество данных при помощи алгоритма adversarial validation с использованием spark user defined function. В частности, я расскажу, почему такой подход хорошо масштабируется и может быть успешно применен к различным типам данных, становясь универсальным инструментом для встраивания в ваши продуктовые процессы. Все подробности — под катом.
![Обучающая выборка для ML-модели дрейфует в океане неизвестных данных Обучающая выборка для ML-модели дрейфует в океане неизвестных данных](https://habrastorage.org/getpro/habr/upload_files/20b/2aa/6f9/20b2aa6f984bd1b117ce56abf6a82999.png)
В повседневных задачах нам важно понимать, что для ML-моделей нужны самые качественные данные. Поэтому мы применяем эффективную Data Quality проверку на основе алгоритма adversarial validation и spark user defined function.
Статья достаточно большая, но структура не очень сложная:
Из каких подходов DQ мы выбирали.
Сравнение работы этих подходов.
Наш выбор в скорости и качестве — spark udf реализация алгоритма adversarial validation.
Как мы внедрили выбранный подход у себя.
Настоящая сила рождается в голове данных. Она заставляет идти делать DQ, даже когда тело модель хочет упасть. Беар Гриллс.
Из каких подходов DQ мы выбирали
Оценки Data Quality бывают разные, но цель у них одна — понять, что с данными всё в порядке. А если есть какая-то проблема, понять её причину и масштаб. Для этого существует много различных подходов — от самых простых до очень сложных. Каждый выбирает себе тот, который покрывает потребности в решаемых задачах.
Мы в команде финтеха МТС Digital обрабатываем огромные объёмы данных, поэтому важно, чтобы эти данные были надлежащего качества. Цена возможной ошибки растет пропорционально росту объёма данных. Тут и время, которое тратят люди и сервера на обработку данных, и сложности с поиском ошибок из-за объёмов и разной структуры в данных. Проблем становится ещё больше, когда речь заходит о построении ML-моделей. По известному всем правилу «Garbage in, garbage out» качество входных данных определяет качество итоговой модели. Поэтому мы используем большинство известных подходов к оценке качества.
Довольно частая задача — понять, насколько сильно данные за текущую дату (business_dt = t) отличаются от данных за предыдущую дату (business_dt = t-n). Будем смотреть на изменение данных относительно эталонной даты при помощи следующих алгоритмов (на примере numeric данных):
Population stability index (psi)
Kolmogorov-smirnov statistic (kstest)
Adversarial Validation
Это далеко не исчерпывающий список возможных проверок данных, но он частично отражает то, чем пользуемся мы в своей ежедневной работе.
Вот небольшие вводные по каждому из этих способов оценки данных:
Метрика Population Stability Index (PSI) предназначена для измерения изменений в распределении переменной между двумя выборками в разные периоды времени.
![](https://habrastorage.org/getpro/habr/upload_files/6f8/3dc/819/6f83dc819ab84ab205005c6cafd156e0.png)
Эта расчётная величина показывает различия в исходных данных, которые разделили на бины вручную (над непрерывными данными), или же они уже были поделены таким образом заранее (категориальные данные).
Критерий Колмогорова-Смирнова — статистический метод служит для определения, принадлежит ли выборка известному закону распределения. Путём сравнения кумулятивных функций распределения и анализа максимального вертикального расхождения можно оценить соответствие выборки теоретической модели. Критерий чувствителен к отклонениям как в центре, так и на краях распределения, обеспечивая более всестороннюю проверку соответствия выборки закону распределения.
![](https://habrastorage.org/getpro/habr/upload_files/ec7/8f6/21f/ec78f621fa85bf70261169b50902cc45.png)
Из этого критерия нам интересна величина статистики — точка, в которой сумма накопленных расхождений между двумя распределениями является наибольшей.
Мы будем сравнивать два непрерывных распределения данных и оценивать их изменение.
Алгоритм adversarial validation — это подход в машинном обучении, который позволяет оценить, насколько две выборки данных похожи друг на друга с использованием ML-модели. Если модель успешно различает две выборки, это указывает на потенциальные различия между ними.
![](https://habrastorage.org/getpro/habr/upload_files/2cd/386/465/2cd38646530e10540f83071b3a4f9c3d.png)
Adversarial Validation — самый интересный подход, когда дело касается оценки изменений в большом объёме данных. В качестве алгоритма adversarial classifier удобнее всего брать градиентный бустинг. Алгоритмы градиентного бустинга очень хорошо параллелятся. Бустинг лучше других алгоритмов работает с необработанными данными — мы не тратим время на предобработку (или делаем её по минимуму), а подаём данные в исходном виде (в том, в котором они были записаны в базу данных).
Кстати, мы ранее уже рассказывали про этот алгоритм, почитать статью можно тут.
Сравнение работы подходов
На этих трёх критериях мы проведём оценку ситуаций, которые могут возникнуть с вашей обучающей выборкой для ML-модели в реальной жизни. Мы рассмотрим такие кейсы, как:
изменение распределения данных
перекосы в данных
появление равномерных пропусков в данных
появление экстремальных пропусков в данных
После этого посмотрим на проблему в комплексной ситуации, когда меняется более, чем один признак. Мы хотим сравнить эти оценки между собой при однотипном изменении данных. Нам нужно разобраться в том, как каждый из этих критериев меняется при изменении данных.
На графиках ниже симулируем поэтапное изменение данных — каждый новый день данные немного отличаются от начального дня наблюдений. Для каждого нового дня мы считаем наши оценки и визуализируем их. Метрика roc auc оценивает нашу adversarial validation модель, kstest показывает величину statistic, а psi — само значение psi.
![Проверяем распространённую проблему — изменение вида распределения данных. Проверяем распространённую проблему — изменение вида распределения данных.](https://habrastorage.org/getpro/habr/upload_files/e0c/546/60e/e0c54660eebd59beb023fe411db25cc3.gif)
По изменениям графика выше мы можем сделать следующие выводы: все алгоритмы справляются с поставленной задачей — каждая из трёх метрик растёт вместе с счётчиком числа дней по мере того, как исходное распределение становится всё менее похожим на актуальное. Значит, это посильная задача для трёх наших алгоритмов — поймать момент, когда нужно бить тревогу и говорить, что произошел data shift.
Проверим наши алгоритмы на другой задаче — данные меняются менее предсказуемым образом. Посмотрим на более нетипичное изменение исходных данных, которое тем не менее вполне вероятно может произойти в обычной жизни. Цель всё та же — сравнить работу алгоритмов между собой и понять, как каждый из них реагирует на изменения разного вида.
![](https://habrastorage.org/getpro/habr/upload_files/f4e/349/085/f4e349085d60e60da5dc44ca3d2e56bd.gif)
Здесь мы видим схожие паттерны роста метрик, но есть и отличия. Roc auc и psi примерно одновременно пробивают свои «тревожные границы», но psi в определенный момент (примерно на 28 день) возвращается к зоне «нормальности», когда мы можем говорить, что в данных отличий нет. Алгоритм adversarial validation так себя не ведёт.
Проверим срабатывание наших проверок при равномерном появлении пропусков в данных.
![](https://habrastorage.org/getpro/habr/upload_files/9d9/066/a67/9d9066a67575a44188d5b09fe3673235.gif)
Видно, что каждый критерий, как и в первом случае, меняется с ростом числа дней, но psi пробивает границу слишком поздно, нежели adversarial validation. По темпам роста можно сказать, что roc auc растёт быстрее остальных метрик, у kstest рост более линейный, а psi явно запаздывает и достигает критической отметки только к концу наблюдаемого периода.
Следующая потенциальная проблема — пропуски в данных будут появляются с конца распределения.
![](https://habrastorage.org/getpro/habr/upload_files/e32/1aa/f41/e321aaf4173b6649bcb9ebca57b474bc.gif)
В этом кейсе psi показал себя лучше, так как adversarial validation преодолел черту сильно позже.
При разборе этих симуляций я почти не затронул график метрики kstest. Причина кроется в отсутствии какого-либо общепринятого уровня, после достижения которого нам следует бить тревогу. Он может сильно меняться с изменением данных — и его нужно каждый раз интерпретировать и пересматривать. Тем не менее, как мы уже увидели, kstest хорошо коррелирует с другими метриками, поэтому мы можем рассматривать его, как альтернативу.
В реальной жизни данные редко меняются по одному признаку за раз. Они могут меняться одновременно и динамически. Как правило, мы наблюдаем сложную цепочку изменений, где могут быть неочевидные зависимости. Поэтому и DQ к данным нужно применять комплексно — с целью отследить процесс изменения исходного набора данных.
Проведём очередную симуляцию, но на этот раз будем менять шесть столбцов данных за раз. И kstest, и psi умеют работать одновременно только с одним столбцом, поэтому будем примять их последовательно, а результаты суммировать. С adversarial validation таких проблем нет, поэтому за одну итерацию работы алгоритма мы сможем получить значения roc auc, а также feature importance. По feature importance будем судить о том, с каким столбцом данных в нашей базе произошли проблемы.
![](https://habrastorage.org/getpro/habr/upload_files/4ca/0c9/ad0/4ca0c9ad02c9dcc579e9d211be62effd.gif)
В левой части графиков мы смотрим, как изменилось исходное распределение и течением времени для 6 столбцов данных. По центру мы визуализируем «силу изменения» столбцов — через значения оценки. Правая часть графика показывает общую оценку изменения в данных.
Видим, что в момент, как только второй столбец по psi пересекает границу «нормальности», значение roc auc по алгоритму adversarial validation уже достигает ~1.0, что показывает более сильную чувствительность данного подхода относительно других. Распределение kstest по столбцам данных очень сильно коррелирует с распределением psi, при этом накопленный итог растет быстрее.
Что мы увидим, если будет меняться только один столбец данных из шести? Как поведут себя метрики?
![](https://habrastorage.org/getpro/habr/upload_files/7a7/0d6/cb9/7a70d6cb9180f27dc7db9d7607a0645d.gif)
Изменение только одного признака из всего набора данных очевидным образом не повлияло на алгоритмы kstest и psi. Метрика roc auc алгоритма adversarial validation в этот раз достигла своего предела немного позже, нежели на предыдущей симуляции. В feature importance у этого алгоритма так же появился некоторый шум по всем признакам, кроме первого. Но, как и в самой первой симуляции, результаты разных алгоритмов получились схожими.
Так почему же мы выбрали adversarial validation? Он обладает рядом неоспоримых преимуществ:
лёгкая интерпретируемость — как мы увидели, в алгоритме нужно следить за общей метрикой roc auc и дополнительно отслеживать feature importance для каждого столбца данных. Это стандартные показатели, если вы работали с ML моделями, поэтому их понимание и интерпретация не является сложной задачей
возможность обработки большого объема данных за одну итерацию работы алгоритма — под капотом у adversarial validation применяется некоторый adversarial classifier, обычно применяют эффективные реализации градиентных бустингов (catboost, lightgbm, xgboost). Нам достаточно один раз обучить этот классификатор на нашем объёме данных, чтобы получить результат. К тому же вышеупомянутые алгоритмы подлежат распараллеливанию, что даёт прирост в скорости нашей DQ проверки
отсутствие проблем с применением алгоритма к данным разного типа — можно без проблем обучать gradient boosting classifier без сложной подготовки данных (или вообще без неё)
Выбранный алгоритм может быть невероятно полезным, но как его применить в процессах, построенных вокруг больших продуктовых hadoop кластеров?
Наш выбор в скорости и качестве — spark udf реализация алгоритма adversarial validation
Нам важно, чтобы процессы были легко масштабируемыми, ведь у нас big data – кластера hadoop — множество регламентных процессов на airflow. Важно, чтобы можно было легко встроить оценку в существующий процесс и не ждать её завершения целую вечность. Это также может быть проблемой, когда у вас много зависимых друг от друга процессов. Поэтому алгоритм adversarial validation был адаптирован под работу через user defined function.
Spark UDF (User Defined Function) — функция, созданная пользователем для обработки данных в Apache Spark. Она позволяет применять пользовательский код к столбцам DataFrame для реализации нестандартных операций обработки данных.
Примерная структура кода представлена ниже. Примерная она просто потому, что может отличаться в реализации в зависимости от ваших особенностей кластера и версий библиотек.
import pandas as pd
from catboost import CatBoostClassifier
from sklearn.metrics import roc_auc_score
from pyspark.sql.types import StructType, StructField, FloatType
def custom_udf(dataframe, **kwargs):
model = CatBoostClassifier(random_state=15, verbose=0, iterations=20)
model = model.fit(dataframe[kwargs['features']], dataframe[kwargs['target']])
preds = model.predict_proba(dataframe[kwargs['features']])[:, 1]
score = roc_auc_score(y_true=dataframe[kwargs['target']], y_pred=preds)
return pd.DataFrame({"score": score}, index=[0])
schema_df_scores = StructType([
StructField("score", FloatType(), True),
])
result = spark_dataframe.groupBy('batch').applyInPandas(custom_udf, schema_df_scores)
Мы задаём код, который будет отрабатывать в рамках одного батча данных. В этой реализации важно понимать структуру и сам алгоритм. В качестве adversarial classifier выбран CatBoostClassifier. В нём можно использовать разные гиперпараметры в зависимости от ваших задач и целей. Рекомендуем выбрать небольшое значение для iterations, например число 20. Это имеет смысл по ряду причин:
с небольшим количеством iterations мы уменьшаем вероятность переобучиться под данные
делая меньше число iterations, мы уменьшаем время работы алгоритма. Помним, что он должен применяться к большому объёму данных
наш алгоритм изучает более простые зависимости в данных, что будет плюсом. Наша задача — сделать понятную Data Quality проверку, а не получить наивысшую метрику модели. Поэтому уменьшение числа ведёт к интерпретируемости
Внутри функции custom_udf описывается основная логика алгоритма adversarial validation. Мы должны заранее создать столбец целевой переменной, а также столбец батча — по нему в будущем будет применяться udf-функция.
Применение алгоритма к данным в spark происходит обычным для spark-udf-функций образом — по столбцу батча. Мы должны задать ожидаемый формат выходных данных и применить функцию к заранее созданному в наборе данных столбцу batch.
Что вернёт эта spark-udf-функция? Dataframe из одного столбца: score. Особенностью будет число строк в этом dataframe. Оно равно числу уникальных значений batch в вашем исходном наборе данных. Это значит, что мы проведём дополнительно некоторую агрегацию результатов, потому что число батчей будет равно минимум двум. А средний случай — будет выбрано столько батчей, сколько необходимо для корректного размещения dataframe в памяти на spark ноде при работе алгоритма.
Как мы внедрили выбранный подход у себя
У нас есть множество процессов для создания витрин данных, на которых обучаются ML-модели. Все они крутятся на airflow и пишут таблицы в spark. Поэтому применение написанной udf очень простое: нужно сделать проверку adversarial validation отдельным шагом DAG, чтобы он сравнивал только что записанные данные с некоторой эталонной партицией.
![](https://habrastorage.org/getpro/habr/upload_files/533/26d/492/53326d49222a5cb64a85a1806829ba99.png)
Результат работы проверки мониторим на вот таких парных heatmap графиках: по графику с метрикой roc auc определяем, есть ли проблема в новом наборе данных. А по графику feature importance определяем столбец данных, с которым произошла проблема. В итоге время на поиск проблем снижается, а данные становится проще мониторить.
![](https://habrastorage.org/getpro/habr/upload_files/321/a4d/b59/321a4db59ec5a6312ef0f92b3fcc5922.png)
Здесь как раз представлен пример оценки одного процесса, когда adversarial validation показал свою эффективность — в один из дней у нас сломалась категориальная фича в таблице — её распределение сильно изменилось, появилось много новых значений, которых не было ранее.
С таким подходом не нужно придумывать/прописывать индивидуальные проверки, достаточно применить алгоритм к новому источнику данных «из коробки».
По общему значению roc auc, близкому к 1, мы понимаем, что есть сильные изменения в данных, а по feature importance мы понимаем, что привело к такому высокому скору.
Что в итоге?
Этот алгоритм — один из наших подходов к построению Data Quality оценки. Но, конечно же, он не единственный, среди многочисленных DQ проверок, которые стоит применять для мониторинга важных данных. Все тесты и проверки разные — важно, чтобы они покрывали ваши потребности в анализе качества данных. Нам важно, чтобы проверка отрабатывала быстро, а её результаты были интерпретируемы.
zeaa
Catboost и xgboost, кажется, имеют спарковские реализации. Почему используете UDF?
MaximML Автор
Привет, хороший вопрос. Применяя udf в spark, мы можем извлечь любую информацию, дополняя ту, которая уже была упомянута. В контексте той же самой udf мы дополнительно извлекаем: размер батча, вес батча в мегабайтах, доля целевой переменной, некоторые статистики по категориальным переменным. Все это - не ключевая, но тоже важная информация для мониторинга и анализа. Ее быстрее и проще достать за один прогон spark udf функции, а не каждую по отдельности