Привет, Хабр!
Сегодня расскажем, как с помощью Dask можно анализировать временные ряды. С временными рядами всегда заморочек много: большие данные, сложные расчеты. Но Dask отлично с этим справляется.
Для начала установим Dask:
!pip install dask[complete]
Не забудьте проверить, чтобы все установилось правильно. У Dask много зависимостей, и конфликты иногда случаются.
Загрузка и предобработка временных данных с Dask DF
Загрузим условно большой CSV-файл с временными рядами, например, данные по продажам за несколько лет.
import dask.dataframe as dd
df = dd.read_csv('large_sales_data.csv', parse_dates=['Date'], blocksize='64MB')
Dask read_csv
не загружает файл целиком в память. Он разбивает данные на чанки и загружает их по требованию, что позволяет обрабатывать действительно большие файлы.
Правильно выбранный размер blocksize
(например, 64MB
или 128MB
) очень важен. Подбирайте его в зависимости от объема оперативной памяти.
Фильтрация данных и работа с пропусками
Сначала выберем данные только за последние три года и заполним пропущенные значения.
df = df[df['Date'] >= '2024-01-01']
df['Sales'] = df['Sales'].fillna(0)
Если у вас много категориальных данных, подумайте о предварительном преобразовании в category
.
Агрегация временных рядов
Dask поддерживает большинство агрегационных функций Pandas, и это одна из его главных фич. Рассчитаем средние продажи по дням.
daily_sales = df.groupby(df['Date'].dt.date).Sales.mean()
daily_sales = daily_sales.compute() # выполняем вычисления
Чтобы повысить производительность агрегации, экспериментируйте с функциями split_out
и split_every
— они позволяют оптимизировать параллельные вычисления по разным разделам данных.
Расчет скользящего среднего с Dask
Dask поддерживает функции скользящих окон, но с учетом некоторых ограничений: данные должны быть отсортированы по времени.
df['Sales_Rolling_Mean'] = df['Sales'].rolling(window=7).mean()
print(df.head().compute())
Скользящее окно лучше всего подходит для небольших временных интервалов, иначе вычисления становятся ресурсоемкими. Если нужно более крупное окно, например, на месячной или годовой основе, лучше использовать тот же downsampling с последующей агрегацией.
Управление чанками и партициями
Размер чанков — вопрос ресурсов, поэтому подбор оптимального размера важен.
df = df.repartition(npartitions=10)
Можно попробовать rechunk
с параметром chunksize
на сложных операциях, чтобы Dask мог оптимально разбивать данные по потокам, минимизируя нагрузку на память.
Прогнозирование с Dask и dask-ml
Dask интегрирован с dask-ml
, что позволяет строить масштабируемые модели машинного обучения. Разделим данные на обучающую и тестовую выборки.
from dask_ml.model_selection import train_test_split
X = df[['Date']].values.reshape(-1, 1)
y = df['Sales'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
Для построения модели используем LinearRegression
из dask-ml
.
from dask_ml.linear_model import LinearRegression
model = LinearRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(y_pred.compute())
На заметку: Dask поддерживает Incremental
модели.
Оптимизация и кэширование результатов
Когда одна и та же операция используется несколько раз, имеет смысл закэшировать результат.
cached_result = df['Sales'].mean().persist()
print(cached_result.compute())
Кэшированные данные хранятся в оперативной памяти, поэтому используйте этот подход осторожно при работе с большими данными.
Визуализация
Dask работает с matplotlib
и seaborn
, имяя дефолтный интерфейс .plot()
.
import matplotlib.pyplot as plt
daily_sales.plot(figsize=(10, 5))
plt.title("Средние продажи по дням")
plt.xlabel("Дата")
plt.ylabel("Продажи")
plt.show()
Для работы с выборками данных удобно использовать .sample()
:
df_sample = df.sample(frac=0.1).compute()
df_sample.plot()
Чтобы сохранить память при визуализации больших данных, используйте datashader
.
Dask — это шикарный инструмент для больших временных рядов, особенно когда Pandas не справляется из-за объема данных. Если у вас есть вопросы или чем поделиться, пишите в комментариях.
А всем, кому интересен системный и бизнес-анализ, рекомендую обратить внимание на открытые уроки по темам:
akakoychenko
Не имею ничего против самой технологии, но, собеседуя ML-инженеров, заметил какую особенность: опыт работы с dask сильно коррелирует со слабыми знаниями SQL.
На первый взгляд, идея пандаса на стероидах, который позволит взять тестовый файлик на 10 МБ, написать скрипт в питоне, а потом просто взять и запустить его на 10ТБ, ничего не меняя, выглядит заманчиво. Примерно, как идея сделать облачный Excel для больших данных, уверен, зайдет многим менеджерам среднего звена:)
Более того, идея быть уважаемым высокоплачиваемым ML-инженером, не зная SQL, выглядит заманчиво тоже.
Лично для меня, все же, Dask/H2O/любой другой турбо-датафрейм с Pandas-совместимой сигнатурой выглядит какой-то тупиковой веткой развития. Pandas изначально пошёл путем, который не дружит с большими данными. Его императивная парадигма даёт куда меньше свободы оптимизатору, чем декларативный SQL. А использование аналитической БД для подготовки данных, а Pandas лишь, как клея, который позволяет приклеить result set к библиотекам ML/визуализации, выглядит, как то, что отлично работает, и не требует починки
CrazyElf
Если уж большие данные, то что-то среднее между
Pandas
иSQL
- это будетSpark
тогда.Dask
как-то ни туда ни сюда. Сколько с ним пытался работать, если память есть, то иPandas
справится, а если памяти нет, то иDask
не поможет, обязательно какие-то косяки будут. Ну и потом жеVaex
придумали, ещё лучше, чемDask
, и какие-то ещё либы наподобие.akakoychenko
Думаете, таки Pandas < Spark < SQL?
Как будто бы, спарк куда более хардкорная и требовательная как к компетенции, так и к необходимому времени и внимании на написание кода, штука, чем SQL, и к нему следует прибегнуть, когда SQL исчерпал свою гибкость, разве нет?
CrazyElf
Эээ, ну может
Spark
для не-SQL
баз. Или для смешанных БД. Ну, да, наверное вы правы,Spark
где-то ещё выше должен лежать как надстройка над разными видами источников данных, включаяrelational DB
.