Привет, Хабр!

Сегодня я расскажу про библиотеку Python River, которая позволяет обучать модели машинного обучения в потоковом режиме.

В классическом варианте мы собираем весь датасет целиком, делим на обучающую и тестовую выборки, обучаем модель, измеряем качество — и внедряем в прод. Здорово, если данных немного и они разом доступны. А если данные льются непрерывно?

Новые пользователи приходят каждый день, события генерируются каждую секунду. Модель в продакшене устаревает, если не переучивать её регулярно. Переобучение с нуля нарастающим объёмам данных — удовольствие ниже среднего: долго, ресурсозатратно, да и не всегда возможно, если данные бесконечны (например, поток кликов или показателей датчиков).

Тут поможет потоковое (инкрементальное) машинное обучение. Идея проста: обучаем модель понемногу, шаг за шагом, каждый раз на новом поступившем примере. В идеале модель учится постоянно, без полного переразметки на всём датасете. Такой подход решает несколько проблем разом:

  • Ограниченная память — не нужно держать весь датасет, достаточно обрабатывать поступающие элементы;

  • Адаптивность — модель самостоятельно подстраивается под новые данные, переживает дрейф данных лучше, потому что сразу учится на свежем материале;

  • Скорость обучения — каждый шаг обучения быстрый, ведь мы обновляем веса модели всего по одному (или нескольким) объектам, а не перегоняем гигабайты данных скопом.

Конечно, потоковое обучение нужно не всегда. Если данные статичны или поступают очень медленно, batch‑подход всех устраивает.

Знакомство с библиотекой River

Библиотека River создана специально для онлайн‑обучения моделей на потоках данных. Это результат объединения проектов Creme и scikit‑multiflow — разработчики собрали лучшие идеи для работы с непрерывными данными. River полностью на Python и органично вписывается в привычную экосистему.

Устанавливается через пип:

pip install river

Что умеет River? Почти всё, что и классический ML‑инструментарий, только в режиме стрима:

  • Классификация и регрессия (есть линейные модели, деревья решений, ближайшие соседи и другие алгоритмы, адаптированные под поэлементное обучение);

  • Анomaly detection (выявление выбросов на ходу);

  • Обнаружение дрейфа данных (специальные детекторы типа ADWIN, DDM и др. сигнализируют, когда распределение данных изменилось);

  • Рекомендательные системы, кластеризация, обработка текстов, прогнозирование временных рядов — библиотека универсальна и покрывает многие задачи;

  • Метрики качества и валидация на потоке (можно оценивать модель постепенно, без отдельного тестового набора, считая метрику после каждого шага обучения, так называемая прогрессивная оценка).

River изначально заточен под обработку одного объекта за раз. Тут нет привычной векторизации NumPy: вместо массивов используется словарь признаков для каждого примера. Это сделано умышленно, когда обрабатываешь элементы по одному, накладные расходы на создание больших массивов ни к чему, обычные питоновские структуры работают быстрее. Зато можно передавать в модель объекты с нестандартными признаками, даже если где‑то пропали или появились новые фичи.

Обучаем модель на потоковых данных

Попробуем что‑нибудь обучить с помощью River. Библиотека имеет модуль river.datasets с несколькими встроенными потоковыми датасетами для экспериментов. Возьмём, к примеру, набор Phishing, это данные о веб‑страницах, где каждая строчка описывает сайт и помечена, является ли он фишинговым. Этот датасет хранится сжатым в поставке River и загружается особым образом, не целиком в память, а строчка за строчкой.

from river import datasets

dataset = datasets.Phishing() 
print(dataset)  

Так как dataset является итерируемым объектом, мы можем проходить по нему в цикле for и на каждой итерации получать отдельный пример:

for x, y in dataset:
    # x - словарь признаков для одного сайта
    # y - целевая переменная (True/False, фишинговый ли сайт)
    print(x)
    print(y)
    break  # выходим после первого примера для демонстрации

В консоли увидим что‑то вроде:

{'empty_server_form_handler': 1.0, 'popup_window': 0.5, 'https': 1.0, ... 'ip_in_url': 0}
False

Каждый сайт описан набором признаков (например, есть ли всплывающие окна, используется ли HTTPS, присутствует ли IP‑адрес в URL, и так далее), а y=False означает, что первый сайт из датасета не является фишинговым.

Теперь создадим простейшую модель — логистическую регрессию, и будем обучать её по одному примеру. В River для каждого алгоритма реализован метод learn_one(x, y), который учит модель на одном экземпляре с признаками x и ответом y.

from river import linear_model

model = linear_model.LogisticRegression()  # наша модель для классификации

for x, y in dataset:
    model.learn_one(x, y)  # обновляем модель на каждом новом примере

Вот и всё, модель постепенно просмотрела все примеры и обновила свои весовые коэффициенты для каждого. Ни одного вызова fit на всём наборе сразу — только последовательное learn_one. Такая модель уже умеет делать прогнозы, метод predict_one(x) вернёт предсказанный класс, а predict_proba_one(x) — вероятности классов.

Перепишем цикл с учётом этого:

from river import metrics

model = linear_model.LogisticRegression()
metric = metrics.ROCAUC()  # возьмём метрику качества ROC AUC для демонстрации

for x, y in dataset:
    y_pred = model.predict_proba_one(x)       # (1) модель предсказывает вероятность "фишинговости"
    model.learn_one(x, y)                    # (2) обновляем модель на истинном ответе
    metric.update(y, y_pred)                 # (3) обновляем метрику качества

После такого обучения метрика ROCAUC будет содержать значение качества модели после прохода по всему стриму. Например, можно вывести print(metric) и увидеть что‑то вроде ROCAUC: 0.89 (89% площадью под ROC‑кривой). Это и есть оценка модели, полученная прогрессивно, то есть модель постоянно оценивалась на новых данных до их обучени.

Пайплайны и преобразование признаков

Вы могли заметить, что логистическая регрессия без обработки признаков — штука простая и может работать неидеально, если признаки не отнормированы. В batch мы бы применили StandardScaler или другое преобразование ко всем данным. В River тоже можно это сделать, только в онлайн‑режиме,у библиотеки есть модуль preprocessing с различными трансформерами (Normalizer, StandardScaler и так далее), а объединять их с моделью помогает класс compose.Pipeline. Он напоминает sklearn.pipeline.Pipeline, только вместо последовательного fit_transform у него последовательный вызов learn_one и transform_one для каждого шага.

Улучшим нашу модель, добавив масштабирование признаков:

from river import compose, preprocessing, linear_model

model = compose.Pipeline(
    preprocessing.StandardScaler(),        # шаг 1: стандартизация признаков (скользящее среднее и variance)
    linear_model.LogisticRegression()      # шаг 2: сама модель
)

# Обучаем pipeline так же, как раньше
metric = metrics.ROCAUC()

for x, y in datasets.Phishing():
    y_pred = model.predict_proba_one(x)
    model.learn_one(x, y)       # внутри pipeline каждый новый x пройдет через StandardScaler, затем в модель
    metric.update(y, y_pred)

print(metric)  
# Например: ROCAUC: 0.95

Как видим, добавление стандартизатора заметно повысило качество. Что произошло внутри: StandardScaler в режиме онлайн на каждом шаге обновляет среднее и стандартное отклонение для каждой фичи и нормирует текущий пример x перед тем, как передать его в логистическую регрессию. То есть признаки масштабируются по мере поступления данных, используя информацию, накопленную на предыдущих примерах. В результате модель получает адекватно отнормированные данные даже без взгляда на весь датасет сразу.

Pipeline в River может состоять из множества этапов, можно делать разветвления (например, часть признаков обрабатывать одним способом, часть другим) и объединять обратно, для этого есть утилиты вроде TransformerUnion. Всё это работает в потоковом режиме, вызывая соответствующие методы .learn_one / .transform_one у каждого шага.

Изменение данных и дрейф

Одной из фич потоковых моделей является способность меняться вместе с данными.

Данные не статичны: поведение пользователей меняется, финансовые показатели плавают, слова в трендах приходят и уходят. Со временем это приводит к дрейфу концепции — модель, обученная на старых данных, начинает хуже предсказывать новые. River содержит методы для обнаружения такого дрейфа (например, river.drift.ADWIN и другие). Их можно прикрепить к модели и мониторить, не пора ли обновить алгоритм или параметры. Некоторые модели в River вообще сами по себе адаптивны: например, дерево Hoeffding Tree может менять структуру, ансамбли вроде Adaptive Random Forest — пересбрасывать слабые модели, если меняется распределение.

Однако, не стоит думать, что River заменит весь ваш ML‑стек. Потоковое обучение имеет смысл далеко не во всех проектах. Если данные можно раз в день собрать и переработать пачкой — иногда это проще и результат надёжнее.

С нуля до Middle ML инженера (Data Scientist) --> онлайн-курс
С нуля до Middle ML инженера (Data Scientist) --> онлайн-курс

Если ML у вас живёт в проде, а данные меняются каждый день — эти открытые уроки помогут закрепить тему потокового обучения на практике: окружение, аномалии и модели в динамичной среде.

  • 14 января 18:00. «Локальное окружение для начинающего ML-инженера». Записаться

  • 21 января 20:00. «Ищем аномалии в данных: от простых графиков до машинного обучения». Записаться

  • 22 января 20:00. «FinRL — фреймворк RL для построения торговых агентов». Записаться

Комментарии (0)