Всем привет! Меня зовут Александр Андреев, я инженер данных. Сегодня я хочу рассказать вам о библиотеке Polars - потенциальной замене Pandas, любимой у большинства дата-инженеров и дата-саентистов библиотеки для работы с данными.

В своей статье я последовательно пройдусь от истории библиотеки Polars до примеров кода, технических аспектов ее производительности и в конце дам ссылки на все бенчмарки и дополнительные статьи, которые использовались для написания данной статьи.

Введение: откуда и зачем

В экосистеме Python для обработки данных долгие годы безраздельно господствовал Pandas. Эта библиотека стала де-факто стандартом для анализа данных, обработки табличной информации и построения ETL-пайплайнов. Однако с ростом объемов данных и повышением требований к производительности, ограничения Pandas становились все более очевидными. Именно в этот момент на сцену вышел Polars — библиотека, которая обещает высокую производительность и при этом сохраняет привычный и интуитивный API.

История Polars началась в 2020 году, когда Ричи Винк (Ritchie Vink) решил создать библиотеку для работы с данными, которая бы использовала все преимущества современного железа и не была ограничена легаси-решениями. За пять лет проект набрал более 34000 звезд на GitHub, и ныне популярность этой библиотеки стремительно возрастает.

Что такое Polars и почему он такой быстрый

Polars представляет собой dataframe-библиотеку с OLAP Query Engine, реализованным на языке программирования Rust с использованием колоночного формата Apache Arrow в качестве модели памяти.

Rust под капотом играет критическую роль в архитектуре Polars. В отличие от Pandas, который построен поверх NumPy и ограничен особенностями работы Python с памятью, Polars полностью контролирует все аспекты управления данными на системном уровне. Rust компилируется в машинный код без необходимости интерпретации, обеспечивает безопасную многопоточность и эффективное управление памятью без сборщика мусора. Это позволяет Polars достигать производительности, сравнимой с C и C++, сохраняя при этом безопасность памяти и предсказуемость поведения.

Колоночное хранение данных через Apache Arrow обеспечивает несколько ключевых преимуществ. Во-первых, данные одного типа хранятся последовательно в памяти, что позволяет процессору эффективно использовать кэш и применять SIMD-инструкции для векторизованных операций. Во-вторых, Arrow обеспечивает стандартизированный формат данных в памяти, что позволяет осуществлять zero-copy обмен данными между различными системами и библиотеками. В-третьих, Arrow нативно поддерживает широкий спектр типов данных, включая вложенные структуры, что делает Polars более гибким в работе со сложными данными.

Параллелизм в Polars реализован на фундаментальном уровне. В то время как Pandas работает в однопоточном режиме и требует дополнительных библиотек вроде Dask для распараллеливания, Polars автоматически использует все доступные ядра процессора для выполнения операций. Ричи Винк описывает производительность Polars как "embarrassingly parallel" — библиотека может безопасно распараллелить даже сложные запросы с множеством колонок без дополнительной настройки со стороны пользователя.

Ленивые вычисления - основа быстродействия

Одной из самых мощных особенностей Polars является поддержка двух режимов выполнения: eager (немедленного) и lazy (ленивого). В то время как eager API работает подобно Pandas, выполняя операции сразу при вызове, lazy API позволяет построить граф вычислений, который затем оптимизируется перед выполнением.

# Пример кода:

# Eager выполнение (как в Pandas)
df = pl.read_csv("large_file.csv")
filtered = df.filter(pl.col("amount") > 100)
result = filtered.group_by("category").agg(pl.col("amount").sum())

# Lazy выполнение с оптимизацией
result = (
    pl.scan_csv("large_file.csv")  # Не загружает файл сразу
    .filter(pl.col("amount") > 100)
    .group_by("category")
    .agg(pl.col("amount").sum())
    .collect()  # Только здесь происходит выполнение
)

Оптимизатор запросов в lazy режиме анализирует весь план выполнения и применяет множество оптимизаций. Например, если вы сначала выполняете группировку по всему датафрейму, а затем фильтруете результат только по определенным категориям, оптимизатор переставит операции местами, сначала отфильтровав нужные данные, а затем выполнив группировку только по ним. Это может сократить объем обрабатываемых данных в десятки раз. Кроме того, оптимизатор устраняет избыточные вычисления, объединяет последовательные операции в одну и выбирает наиболее эффективные алгоритмы для конкретных паттернов запросов.

Существует несколько техник оптимизации запросов в lazy режиме:

Predicate Pushdown — фильтры проталкиваются как можно ближе к источнику данных. Если вы читаете Parquet файл и применяете фильтр, Polars может прочитать только нужные row groups, даже не загружая остальные данные в память.

Projection Pushdown — если в конечном результате нужны только определенные колонки, Polars не будет загружать остальные колонки из источника данных, экономя память и время.

Common Subexpression Elimination — если одно и то же выражение используется несколько раз, оно будет вычислено только один раз.

Query Reordering — операции переставляются для минимизации объема обрабатываемых данных. Например, фильтрация будет выполнена до группировки, чтобы уменьшить количество групп.

Для визуализации плана выполнения можно использовать широко известный метод explain().

Также отдельно следует рассказать о streaming engine, который представлен в последних версиях Polars и позволяет обрабатывать данные, которые не помещаются в оперативную память. В отличие от in-memory engine, который загружает все данные в память, streaming engine обрабатывает данные порционно (условными "чанками"), что позволяет работать с датасетами размером в сотни гигабайт на обычной машине.

# Включение streaming engine
pl.Config.set_streaming_chunk_size(10_000)  # Размер чанка

# Обработка огромного файла
result = (
    pl.scan_csv("100gb_file.csv")
    .filter(pl.col("status") == "active")
    .group_by("region")
    .agg(pl.col("revenue").sum())
    .collect(streaming=True)  # Активация streaming режима
)

Реальные цифры: бенчмарки и сравнения

Примечание: в конце статьи я привожу много ссылок на самые разные бенчмарки по принципу "Polars против конкурентов", а здесь дам краткую выжимку из всех изученных мною публичных тестов.

Итак, рассмотрим результаты тестирования на различных операциях и размерах данных.

На официальных PDS-H бенчмарках (производных от TPC-H), которые моделируют реальные аналитические нагрузки, Polars показывает впечатляющие результаты. На датасетах масштаба SF-10 (около 10 ГБ данных) Polars streaming engine и DuckDB показывают схожую производительность, опережая другие решения в несколько раз. Интересно, что с увеличением размера данных до SF-100 преимущество streaming engine становится еще более выраженным благодаря эффективному использованию CPU-кэша и избеганию проблем с memory-bound операциями.

На типичных операциях обработки данных Polars опережает Pandas в 5-10 раз на средних датасетах и до 30 раз на больших объемах данных. Например, на датасете Covertype из scikit-learn с 581,012 строками операции агрегации в Polars выполнялись в 22 раза быстрее, чем в Pandas. Операции группировки показали ускорение в 2.6 раза, а сортировка была быстрее в 11.7 раз.

Интересная закономерность проявляется при сравнении производительности на датасетах разного размера:

Маленькие датасеты (< 10,000 строк): Pandas иногда может быть быстрее на простых операциях благодаря меньшему overhead на инициализацию. Разница обычно составляет миллисекунды и незаметна для пользователя.

Средние датасеты (10,000 - 1,000,000 строк): Polars начинает показывать преимущество в 2-5 раз на большинстве операций. Особенно заметна разница на группировках и join'ах.

Большие датасеты (> 1,000,000 строк): Преимущество Polars становится подавляющим — ускорение в 10-30 раз становится нормой. На датасетах в десятки миллионов строк Pandas может просто не справиться из-за ограничений памяти, в то время как Polars продолжает работать эффективно.

Очень большие датасеты (> RAM): Здесь Polars с streaming engine показывает уникальную способность работать с данными, превышающими объем оперативной памяти, что для Pandas просто невозможно без использования дополнительных инструментов вроде Dask.

Итого, на маленьких датасетах (менее 10,000 строк) Pandas иногда может быть быстрее на простых операциях фильтрации благодаря меньшему overhead на инициализацию. Однако на операциях, требующих сложных вычислений или работы с большими объемами данных, преимущество Polars становится подавляющим.

Polars vs Pandas: детальное сравнение

Архитектурные различия между Pandas и Polars фундаментальны. Pandas построен на NumPy и наследует его ограничения, особенно в работе со строковыми и категориальными данными. Python обрабатывает строки как объекты с указателями, что приводит к фрагментации памяти и плохой локальности данных. Polars, благодаря использованию Rust и Arrow, работает со строками гораздо эффективнее, храня их в непрерывных блоках памяти.

Рассмотрим простой пример обработки строковых данных:

# Eager выполнение (как в Pandas)
df = pl.read_csv("large_file.csv")
filtered = df.filter(pl.col("amount") > 100)
result = filtered.group_by("category").agg(pl.col("amount").sum())

# Lazy выполнение с оптимизацией
result = (
    pl.scan_csv("large_file.csv")  # Не загружает файл сразу
    .filter(pl.col("amount") > 100)
    .group_by("category")
    .agg(pl.col("amount").sum())
    .collect()  # Только здесь происходит выполнение
)

API и выразительность кода в Polars спроектированы с учетом опыта Pandas, но с исправлением многих его недостатков. В Pandas для сложных операций часто приходится использовать метод apply с lambda-функциями, что приводит к построчной обработке данных и потере производительности. Polars предоставляет богатый набор встроенных выражений, которые выполняются на уровне колонок с использованием векторизации и SIMD-инструкций.

# Pandas - использование apply (медленно)
df_pandas['complex_calc'] = df_pandas.apply(
    lambda row: row['value'] * 2 if row['category'] == 'A' 
                else row['value'] * 1.5 if row['category'] == 'B'
                else row['value'],
    axis=1
)

# Polars - встроенные выражения (быстро)
df_polars = df_polars.with_columns(
    pl.when(pl.col('category') == 'A')
    .then(pl.col('value') * 2)
    .when(pl.col('category') == 'B')
    .then(pl.col('value') * 1.5)
    .otherwise(pl.col('value'))
    .alias('complex_calc')
)

Работа с памятью представляет еще одно важное отличие. Pandas обычно требует в 5-10 раз больше оперативной памяти, чем размер самих данных, для выполнения операций. Polars требует всего в 2-4 раза больше памяти благодаря более эффективному представлению данных и возможности работы в streaming режиме.

Типизация и схема данных в Polars строже, чем в Pandas. Polars не позволяет смешивать типы данных в одной колонке, что может показаться ограничением, но на практике приводит к более предсказуемому поведению и лучшей производительности. Все типы данных должны быть известны до выполнения запроса, что позволяет оптимизатору принимать более эффективные решения.

Polars vs PySpark

Сравнение с PySpark особенно интересно, поскольку обе библиотеки нацелены на обработку больших данных, но используют принципиально разные подходы.

PySpark построен на распределенной модели вычислений и предназначен для работы на кластерах из множества машин. Это дает ему теоретически неограниченную масштабируемость, но также привносит значительный overhead на координацию узлов, сериализацию данных и сетевые передачи. Для датасетов, которые помещаются в память одной машины (а это может быть и 100+ ГБ на современном железе), Polars часто оказывается быстрее PySpark благодаря отсутствию распределенного overhead.

Согласно бенчмаркам, на датасетах размером 10 ГБ Polars показывает производительность в 2-6 раз выше, чем Spark с Native Execution Engine. Особенно заметно преимущество на операциях, которые плохо параллелятся в распределенной среде, таких как сортировка или операции с зависимостями между строками.

Важным фактором является также простота развертывания и эксплуатации. Polars — это просто Python-библиотека, которая устанавливается через pip и работает на любой машине с Python. PySpark требует настройки кластера, управления ресурсами, настройки executor'ов и driver'а. Для многих задач аналитики данных сложность инфраструктуры Spark не оправдывает потенциальные преимущества в масштабируемости.

Интеграция Polars с другими инструментами

Важным аспектом любой библиотеки для работы с данными является ее интеграция с существующей экосистемой инструментов.

Библиотека Polars нативно поддерживает чтение и запись CSV, Parquet, JSON, Arrow IPC, Delta Lake и многих других форматов. Особенно эффективна работа с Parquet благодаря тесной интеграции с Arrow. Polars может читать Parquet файлы с применением предикатов и проекций, загружая только необходимые колонки и строки.

Интеграция с инструментами визуализации постоянно улучшается. Polars уже совместим с plotly, matplotlib (за исключением работы с Series), seaborn, altair и hvplot. Это позволяет использовать библиотеку для полноценного исследовательского анализа данных.

Машинное обучение становится все более доступным с Polars. Начиная с версии scikit-learn 1.4.0, можно выводить результаты трансформеров как Polars DataFrames. Также возможна конвертация в PyTorch тензоры и другие форматы данных для глубокого обучения. Хотя интеграция еще не такая бесшовная, как у Pandas, прогресс в этом направлении идет быстро.

Недостатки и ограничения Polars: честный взгляд

При всех преимуществах Polars, важно понимать его ограничения и случаи, когда другие инструменты могут быть предпочтительнее. Понимание этих ограничений поможет принять взвешенное решение о внедрении библиотеки в ваш технологический стек.

Зрелость устоявшейся экосистемы

Представьте, что вы переезжаете в новый город. Даже если он современнее и удобнее вашего старого места жительства, первое время вы будете скучать по привычным магазинам, знакомым маршрутам и устоявшимся связям. Похожая ситуация с переходом от Pandas к Polars. За более чем десятилетие существования Pandas накопил огромную базу знаний в виде статей, ответов на Stack Overflow, готовых рецептов и интеграций. Когда вы сталкиваетесь с проблемой в Pandas, вероятность найти готовое решение близка к 100%. С Polars вам чаще придется быть первопроходцем, особенно если задачи у вас нетривиальные.

Многие специализированные библиотеки все еще работают только с Pandas DataFrames. Например, некоторые статистические пакеты, библиотеки для работы с временными рядами или специализированные инструменты визуализации могут требовать конвертации из Polars в Pandas и обратно. Хотя эта конвертация обычно быстрая благодаря Apache Arrow, она добавляет дополнительный шаг в ваш пайплайн и может стать узким местом при частом использовании.

Отсутствие привычных концепций

Polars намеренно отказался от некоторых концепций Pandas, которые разработчики считали проблематичными. Самый заметный пример — отсутствие индексов в привычном понимании. В Pandas индекс является фундаментальной концепцией, которая влияет на поведение почти всех операций. В Polars строки просто пронумерованы от 0 до n-1, и эта нумерация не сохраняется при операциях.

Рассмотрим конкретный пример, где это создает сложности. В Pandas вы можете установить дату как индекс и затем использовать удобные методы для работы с временными рядами, такие как resample или rolling с временными окнами. В Polars вам нужно явно указывать колонку с датой в каждой операции:

# Импортируем библиотеку
import polars as pl
# Базовое выражение
expr = pl.col("value")
# Композиция выражений
complex_expr = (pl.col("value")
                .filter(pl.col("category") == "A")
                .sum()
                .over("department")  # Оконная функция    .alias("dept_sum_A")
)
# Использование when-then-otherwise для условной логики
conditional_expr = (pl.when(pl.col("score") > 90)
                    .then("excellent")
                    .when(pl.col("score") > 70)
                    .then("good")
                    .when(pl.col("score") > 50)
                    .then("average")
                    .otherwise("needs improvement")
                    .alias("grade")
)

Другое ограничение — невозможность смешивать типы данных в одной колонке. В Pandas вы можете иметь колонку, содержащую числа, строки и None одновременно. Это часто приводит к трудноуловимым ошибкам, но иногда бывает удобно для быстрого прототипирования или работы с грязными данными. Polars требует, чтобы все значения в колонке были одного типа, что заставляет вас принимать решения о структуре данных заранее.

Сложность переобучения

Переход на Polars требует изменения способа мышления о данных. В Pandas вы часто пишете императивный код — делаете это, затем то, потом еще что-то. В Polars вы строите выражения, которые описывают желаемый результат, а библиотека сама определяет, как его достичь. Это более функциональный подход, который может быть непривычен для разработчиков, пришедших из мира процедурного программирования.

Например, новички часто пытаются модифицировать DataFrame на месте, как в Pandas:

# Попытка изменить DataFrame на месте (не работает в Polars)
df['new_column'] = df['old_column'] * 2  # Ошибка!

# Правильный подход в Polars - создание нового DataFrame
df = df.with_columns(
    (pl.col('old_column') * 2).alias('new_column')
)

Эта иммутабельность на самом деле является преимуществом — она делает код более предсказуемым и облегчает отладку. Но для привыкания требуется время (однако если вы немного "скалист", то есть любитель языка Scala как автор статьи, то привыкать не придется).

Ограничения в специализированных областях

Для некоторых специфических задач Polars пока не предоставляет такого богатого функционала, как Pandas. Работа с мультииндексными данными, сложные операции с временными рядами, некоторые статистические функции — в этих областях Pandas остается более полнофункциональным решением. Хотя Polars активно развивается и постоянно добавляет новые возможности, для узкоспециализированных задач может потребоваться комбинирование обеих библиотек или написание собственных функций.

Практические примеры работы с данными в Polars

Допустим, прочитав эту статью, вы решили поработать с Polars. Стоит сказать, что с Polars будет просто работать тем, кто уже более-менее знает и Pandas, и PySpark. Давайте разберем несколько простых примеров кода.

Создание DataFrame

Polars предоставляет несколько способов создания DataFrame. Рассмотрим наиболее распространённые сценарии.

# Создание из словаря
df = pl.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie', 'Diana'],
    'age': [25, 30, 35, 28],
    'city': ['NYC', 'LA', 'Chicago', 'NYC'],
    'salary': [70000, 85000, 95000, 72000]
})

print(df)
# shape: (4, 4)
# ┌─────────┬─────┬─────────┬────────┐
# │ name    ┆ age ┆ city    ┆ salary │
# │ ---     ┆ --- ┆ ---     ┆ ---    │
# │ str     ┆ i64 ┆ str     ┆ i64    │
# ╞═════════╪═════╪═════════╪════════╡
# │ Alice   ┆ 25  ┆ NYC     ┆ 70000  │
# │ Bob     ┆ 30  ┆ LA      ┆ 85000  │
# │ Charlie ┆ 35  ┆ Chicago ┆ 95000  │
# │ Diana   ┆ 28  ┆ NYC     ┆ 72000  │
# └─────────┴─────┴─────────┴────────┘

# Создание из списков с указанием схемы
df_with_schema = pl.DataFrame(
    data=[[1, "a"], [2, "b"], [3, "c"]],
    schema=["id", "letter"],
    orient="row"  # данные организованы по строкам
)

# Создание пустого DataFrame с заданной схемой
schema = {
    "id": pl.Int32,
    "name": pl.Utf8,
    "score": pl.Float64,
    "active": pl.Boolean
}
empty_df = pl.DataFrame(schema=schema)

Чтение и запись данных

Polars поддерживает множество форматов данных и предлагает оптимизированные методы для их обработки.

# Чтение CSV с автоматическим определением типов
df_csv = pl.read_csv(
    "data.csv",
    has_header=True,
    separator=",",
    try_parse_dates=True  # автоматически парсит даты
)

# Ленивое чтение CSV - не загружает в память сразу
lazy_df = pl.scan_csv("large_file.csv")
# Операции накапливаются и оптимизируются
result = (
    lazy_df
    .filter(pl.col("amount") > 1000)
    .group_by("category")
    .agg(pl.col("amount").sum())
    .collect()  # выполнение происходит только здесь
)

# Чтение Parquet - эффективный колоночный формат
df_parquet = pl.read_parquet("data.parquet")

# Чтение JSON
df_json = pl.read_json("data.json")

# Запись в различные форматы
df.write_csv("output.csv")
df.write_parquet("output.parquet", compression="snappy")
df.write_json("output.json")

# Чтение Excel (требует установки xlsx2csv или openpyxl)
df_excel = pl.read_excel("data.xlsx", sheet_name="Sheet1")

Базовые операции выборки и фильтрации

Polars использует выразительный API для работы с колонками через функцию pl.col()

# Создадим DataFrame для примеров
df = pl.DataFrame({
    'product': ['Apple', 'Banana', 'Orange', 'Apple', 'Banana', 'Orange'],
    'store': ['Store A', 'Store A', 'Store A', 'Store B', 'Store B', 'Store B'],
    'quantity': [10, 15, 8, 12, 20, 6],
    'price': [1.5, 0.5, 0.8, 1.4, 0.6, 0.9],
    'date': [date(2024, 1, 1), date(2024, 1, 1), date(2024, 1, 1),
             date(2024, 1, 2), date(2024, 1, 2), date(2024, 1, 2)]
})

# Выбор колонок
selected = df.select([
    pl.col("product"),
    pl.col("quantity"),
    pl.col("price")
])

# Выбор с переименованием
renamed = df.select([
    pl.col("product").alias("item"),
    pl.col("quantity").alias("qty"),
    (pl.col("quantity") * pl.col("price")).alias("total")
])

# Фильтрация
filtered = df.filter(pl.col("quantity") > 10)

# Множественные условия
complex_filter = df.filter(
    (pl.col("quantity") > 8) & 
    (pl.col("price") < 1.0) |
    (pl.col("product") == "Apple")
)

# Фильтрация с использованием is_in
stores_filter = df.filter(
    pl.col("store").is_in(["Store A", "Store C"])
)

# Фильтрация по паттерну в строках
pattern_filter = df.filter(
    pl.col("product").str.contains("an")  # Apple и Banana
)

Создание и модификация колонок

Polars предоставляет мощный механизм для создания новых колонок и модификации существующих.

# Добавление новых колонок с with_columns
df_enhanced = df.with_columns([
    # Простое вычисление
    (pl.col("quantity") * pl.col("price")).alias("revenue"),
    
    # Условная логика
    pl.when(pl.col("quantity") > 10)
      .then(pl.lit("High"))
      .when(pl.col("quantity") > 5)
      .then(pl.lit("Medium"))
      .otherwise(pl.lit("Low"))
      .alias("stock_level"),
    
    # Работа с датами
    pl.col("date").dt.day_of_week().alias("weekday"),
    pl.col("date").dt.month().alias("month"),
    
    # Математические операции
    (pl.col("price") * 1.2).round(2).alias("price_with_tax")
])

# Изменение существующих колонок
df_modified = df.with_columns([
    pl.col("price").round(1),  # округление на месте
    pl.col("product").str.to_uppercase()  # преобразование в верхний регистр
])

# Использование выражений для сложных преобразований
df_complex = df.with_columns([
    # Ранжирование внутри групп
    pl.col("quantity").rank().over("store").alias("rank_in_store"),
    
    # Процент от общего количества по магазину
    (pl.col("quantity") / pl.col("quantity").sum().over("store") * 100)
      .round(2)
      .alias("percent_of_store")
])

Группировка и агрегация

Группировка — одна из самых частых операций при анализе данных. Polars делает её быстрой и удобной.

# Простая группировка
grouped = df.group_by("product").agg([
    pl.col("quantity").sum().alias("total_quantity"),
    pl.col("price").mean().alias("avg_price"),
    pl.col("store").n_unique().alias("num_stores")
])

# Группировка по нескольким колонкам
multi_grouped = df.group_by(["store", "product"]).agg([
    pl.col("quantity").sum().alias("total_qty"),
    (pl.col("quantity") * pl.col("price")).sum().alias("revenue")
])

# Более сложные агрегации
advanced_agg = df.group_by("store").agg([
    # Статистические метрики
    pl.col("quantity").mean().alias("mean_qty"),
    pl.col("quantity").std().alias("std_qty"),
    pl.col("quantity").quantile(0.75).alias("q75_qty"),
    
    # Списки значений
    pl.col("product").unique().alias("products_list"),
    
    # Условные агрегации
    pl.col("quantity").filter(pl.col("product") == "Apple").sum().alias("apple_qty"),
    
    # Первое и последнее значение
    pl.col("price").first().alias("first_price"),
    pl.col("price").last().alias("last_price")
])

# Группировка с сохранением всех колонок
df_with_group_stats = df.with_columns([
    pl.col("quantity").mean().over("store").alias("store_avg_qty"),
    pl.col("quantity").max().over("store").alias("store_max_qty")
])

Соединение таблиц (Joins)

Polars поддерживает все стандартные типы соединений с оптимизированными алгоритмами.

# Создадим две таблицы для примера
products = pl.DataFrame({
    'product_id': [1, 2, 3, 4],
    'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor'],
    'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics']
})

sales = pl.DataFrame({
    'sale_id': [101, 102, 103, 104, 105],
    'product_id': [1, 2, 1, 3, 5],
    'quantity': [2, 5, 1, 3, 1],
    'date': [date(2024, 1, 1), date(2024, 1, 2), date(2024, 1, 3), 
             date(2024, 1, 3), date(2024, 1, 4)]
})

# Inner join - только совпадающие записи
inner_joined = sales.join(
    products,
    on='product_id',
    how='inner'
)

# Left join - все записи из левой таблицы
left_joined = sales.join(
    products,
    on='product_id',
    how='left'
)

# Outer join - все записи из обеих таблиц
outer_joined = sales.join(
    products,
    on='product_id',
    how='outer'
)

# Semi join - записи из левой таблицы, имеющие совпадения в правой
semi_joined = sales.join(
    products,
    on='product_id',
    how='semi'
)

# Anti join - записи из левой таблицы, НЕ имеющие совпадений в правой
anti_joined = sales.join(
    products,
    on='product_id',
    how='anti'
)

# Join с разными названиями колонок
customers = pl.DataFrame({
    'customer_id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie']
})

orders = pl.DataFrame({
    'order_id': [1001, 1002, 1003],
    'cust_id': [1, 2, 1],  # другое название колонки
    'amount': [100, 200, 150]
})

joined_different_names = orders.join(
    customers,
    left_on='cust_id',
    right_on='customer_id',
    how='inner'
)

Оконные функции

Оконные функции позволяют выполнять вычисления над группами строк без явной группировки, сохраняя все строки в результате.

# Создадим данные для демонстрации
sales_data = pl.DataFrame({
    'date': [date(2024, 1, i) for i in range(1, 16)],
    'store': ['A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 
              'A', 'B', 'A', 'B', 'A'],
    'sales': [100, 150, 110, 160, 120, 140, 130, 170, 125, 165, 
              135, 175, 140, 180, 145]
})

# Применение оконных функций
windowed = sales_data.with_columns([
    # Скользящее среднее
    pl.col("sales").rolling_mean(window_size=3).alias("rolling_avg"),
    
    # Кумулятивная сумма
    pl.col("sales").cum_sum().alias("cumulative_sales"),
    
    # Ранжирование
    pl.col("sales").rank().alias("overall_rank"),
    
    # Ранжирование внутри группы
    pl.col("sales").rank().over("store").alias("store_rank"),
    
    # Процентили
    pl.col("sales").quantile(0.5).over("store").alias("store_median"),
    
    # Лаг и лид (сдвиг значений)
    pl.col("sales").shift(1).alias("prev_sales"),
    pl.col("sales").shift(-1).alias("next_sales"),
    
    # Изменение по сравнению с предыдущим значением
    (pl.col("sales") - pl.col("sales").shift(1)).alias("sales_change"),
    
    # Процент от суммы группы
    (pl.col("sales") / pl.col("sales").sum().over("store") * 100)
      .round(2)
      .alias("percent_of_store_total")
])

# Более сложные оконные вычисления
advanced_window = sales_data.with_columns([
    # Скользящее стандартное отклонение
    pl.col("sales").rolling_std(window_size=5).alias("rolling_std"),
    
    # Min и max в окне
    pl.col("sales").rolling_min(window_size=3).alias("rolling_min"),
    pl.col("sales").rolling_max(window_size=3).alias("rolling_max"),
    
    # Первое и последнее значение в группе
    pl.col("sales").first().over("store").alias("first_store_sale"),
    pl.col("sales").last().over("store").alias("last_store_sale"),
    
    # Z-score внутри группы
    ((pl.col("sales") - pl.col("sales").mean().over("store")) / 
     pl.col("sales").std().over("store")).alias("z_score")
])

Работа с временными рядами

Polars предоставляет богатый функционал для работы с датами и временем.

# Создание временного ряда
start_date = datetime(2024, 1, 1)
dates = [start_date + timedelta(hours=i) for i in range(168)]  # неделя данных

time_series = pl.DataFrame({
    'timestamp': dates,
    'value': np.random.randn(168).cumsum() + 100,
    'category': np.random.choice(['A', 'B', 'C'], 168)
})

# Извлечение компонентов даты/времени
time_features = time_series.with_columns([
    pl.col("timestamp").dt.year().alias("year"),
    pl.col("timestamp").dt.month().alias("month"),
    pl.col("timestamp").dt.day().alias("day"),
    pl.col("timestamp").dt.hour().alias("hour"),
    pl.col("timestamp").dt.day_of_week().alias("day_of_week"),
    pl.col("timestamp").dt.week().alias("week_number"),
    pl.col("timestamp").dt.quarter().alias("quarter")
])

# Ресемплинг (изменение частоты временного ряда)
hourly_avg = (
    time_series
    .group_by_dynamic(
        "timestamp",
        every="1h",  # группировка по часам
        period="1h",  # размер окна
        by="category"  # дополнительная группировка
    )
    .agg([
        pl.col("value").mean().alias("avg_value"),
        pl.col("value").max().alias("max_value"),
        pl.col("value").min().alias("min_value")
    ])
)

# Создание лагов для временных рядов
time_series_lags = time_series.sort("timestamp").with_columns([
    pl.col("value").shift(1).alias("lag_1h"),
    pl.col("value").shift(24).alias("lag_1d"),
    pl.col("value").shift(168).alias("lag_1w")
])

# Вычисление разниц во времени
events = pl.DataFrame({
    'event_id': [1, 2, 3, 4, 5],
    'start_time': [
        datetime(2024, 1, 1, 10, 0),
        datetime(2024, 1, 1, 11, 30),
        datetime(2024, 1, 1, 14, 0),
        datetime(2024, 1, 2, 9, 0),
        datetime(2024, 1, 2, 15, 30)
    ],
    'end_time': [
        datetime(2024, 1, 1, 11, 0),
        datetime(2024, 1, 1, 12, 45),
        datetime(2024, 1, 1, 16, 30),
        datetime(2024, 1, 2, 11, 30),
        datetime(2024, 1, 2, 17, 0)
    ]
})

events_duration = events.with_columns([
    (pl.col("end_time") - pl.col("start_time")).alias("duration"),
    (pl.col("end_time") - pl.col("start_time")).dt.total_seconds().alias("duration_seconds"),
    (pl.col("end_time") - pl.col("start_time")).dt.total_minutes().alias("duration_minutes")
])

Работа со строками

Polars предоставляет обширный набор методов для работы с текстовыми данными.

# Создадим DataFrame с текстовыми данными
text_df = pl.DataFrame({
    'email': ['john.doe@example.com', 'jane_smith@company.org', 
              'bob.wilson@email.co.uk', 'alice@domain.com'],
    'full_name': ['John Doe', 'Jane Smith', 'Bob Wilson', 'Alice Johnson'],
    'phone': ['+1-555-0100', '555.0200', '(555) 0300', '555-0400'],
    'description': ['Senior Developer', 'junior analyst', 'PROJECT MANAGER', 'data scientist']
})

# Базовые строковые операции
text_processed = text_df.with_columns([
    # Изменение регистра
    pl.col("full_name").str.to_uppercase().alias("name_upper"),
    pl.col("description").str.to_lowercase().alias("desc_lower"),
    pl.col("description").str.to_titlecase().alias("desc_title"),
    
    # Длина строки
    pl.col("email").str.len_chars().alias("email_length"),
    
    # Проверка на содержание подстроки
    pl.col("email").str.contains("@company").alias("is_company_email"),
    
    # Извлечение подстрок
    pl.col("email").str.split("@").list.get(0).alias("username"),
    pl.col("email").str.split("@").list.get(1).alias("domain"),
    
    # Замена подстрок
    pl.col("phone").str.replace_all(r"[^\d]", "").alias("phone_digits"),
    
    # Обрезка пробелов
    pl.col("description").str.strip_chars().alias("desc_trimmed")
])

# Регулярные выражения
regex_df = text_df.with_columns([
    # Извлечение по паттерну
    pl.col("email").str.extract(r"([^@]+)@([^.]+)\.(.+)", 1).alias("user"),
    pl.col("email").str.extract(r"([^@]+)@([^.]+)\.(.+)", 2).alias("domain_name"),
    pl.col("email").str.extract(r"([^@]+)@([^.]+)\.(.+)", 3).alias("tld"),
    
    # Проверка соответствия паттерну
    pl.col("phone").str.contains(r"^\+\d").alias("has_country_code"),
    
    # Замена по паттерну
    pl.col("phone").str.replace(r"^\+1-?", "").alias("phone_without_country")
])

# Работа с частями строк
name_parts = text_df.with_columns([
    # Разделение на части
    pl.col("full_name").str.split(" ").alias("name_parts"),
    
    # Первое и последнее слово
    pl.col("full_name").str.split(" ").list.get(0).alias("first_name"),
    pl.col("full_name").str.split(" ").list.get(-1).alias("last_name"),
    
    # Срезы строк
    pl.col("email").str.slice(0, 5).alias("email_prefix"),
    
    # Начинается/заканчивается с
    pl.col("description").str.starts_with("Senior").alias("is_senior"),
    pl.col("email").str.ends_with(".com").alias("is_dotcom")
])

Работа с отсутствующими значениями

Обработка null значений — важная часть подготовки данных.

# Создадим DataFrame с пропущенными значениями
df_with_nulls = pl.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'name': ['Alice', None, 'Charlie', 'Diana', None],
    'age': [25, 30, None, 28, 35],
    'salary': [70000, None, 95000, None, 80000],
    'department': ['IT', 'HR', 'IT', None, 'Finance']
})

# Проверка на null
null_check = df_with_nulls.with_columns([
    pl.col("name").is_null().alias("name_is_null"),
    pl.col("age").is_not_null().alias("age_is_not_null")
])

# Подсчёт null значений
null_counts = df_with_nulls.select([
    pl.all().null_count()
])

# Удаление строк с null
df_no_nulls = df_with_nulls.drop_nulls()  # удаляет строки с любым null
df_no_nulls_subset = df_with_nulls.drop_nulls(subset=["name", "age"])  # только по указанным колонкам

# Заполнение null значений
df_filled = df_with_nulls.with_columns([
    # Заполнение константой
    pl.col("name").fill_null("Unknown"),
    
    # Заполнение средним значением
    pl.col("age").fill_null(pl.col("age").mean()),
    
    # Заполнение медианой
    pl.col("salary").fill_null(pl.col("salary").median()),
    
    # Прямое заполнение (forward fill)
    pl.col("department").forward_fill()
])

# Интерполяция для числовых данных
numeric_series = pl.DataFrame({
    'time': range(10),
    'value': [1.0, None, None, 4.0, 5.0, None, 7.0, 8.0, None, 10.0]
})

interpolated = numeric_series.with_columns([
    pl.col("value").interpolate().alias("linear_interp")
])

# Условное заполнение
conditional_fill = df_with_nulls.with_columns([
    pl.when(pl.col("department") == "IT")
      .then(pl.col("salary").fill_null(85000))
      .otherwise(pl.col("salary").fill_null(75000))
      .alias("salary_filled")
])

Сводные таблицы и reshape операции

Polars поддерживает операции изменения формы данных, включая pivot и melt.

# Исходные данные в длинном формате
sales_long = pl.DataFrame({
    'date': ['2024-01-01', '2024-01-01', '2024-01-01', 
             '2024-01-02', '2024-01-02', '2024-01-02'],
    'product': ['A', 'B', 'C', 'A', 'B', 'C'],
    'store': ['Store1', 'Store1', 'Store1', 'Store1', 'Store1', 'Store1'],
    'sales': [100, 150, 120, 110, 160, 130]
})

# Pivot - из длинного в широкий формат
sales_wide = sales_long.pivot(
    values='sales',
    index='date',
    columns='product'
)
print(sales_wide)
# ┌────────────┬─────┬─────┬─────┐
# │ date       ┆ A   ┆ B   ┆ C   │
# │ ---        ┆ --- ┆ --- ┆ --- │
# │ str        ┆ i64 ┆ i64 ┆ i64 │
# ╞════════════╪═════╪═════╪═════╡
# │ 2024-01-01 ┆ 100 ┆ 150 ┆ 120 │
# │ 2024-01-02 ┆ 110 ┆ 160 ┆ 130 │
# └────────────┴─────┴─────┴─────┘

# Melt - из широкого в длинный формат
melted = sales_wide.melt(
    id_vars=['date'],
    value_vars=['A', 'B', 'C'],
    variable_name='product',
    value_name='sales'
)

# Транспонирование
transposed = pl.DataFrame({
    'metric': ['revenue', 'cost', 'profit'],
    'Q1': [1000, 600, 400],
    'Q2': [1200, 700, 500],
    'Q3': [1100, 650, 450],
    'Q4': [1300, 750, 550]
}).transpose(include_header=True, header_name="quarter")

Оптимизация скорости вычислений в Polars

Под конец статьи рассмотрим некоторые методы оптимизации производительности вычислений при использовании библиотеки Polars.

Пункт 1: используем ленивые вычисления везде где можем

# Создадим большой DataFrame для демонстрации
large_df = pl.DataFrame({
    'id': range(1_000_000),
    'group': np.random.choice(['A', 'B', 'C', 'D'], 1_000_000),
    'value1': np.random.randn(1_000_000),
    'value2': np.random.randn(1_000_000),
    'value3': np.random.randn(1_000_000)
})

# Eager (немедленное) выполнение - каждая операция выполняется сразу
eager_result = (
    large_df
    .filter(pl.col("value1") > 0)
    .with_columns([
        (pl.col("value2") * 2).alias("value2_doubled")
    ])
    .group_by("group")
    .agg([
        pl.col("value1").mean(),
        pl.col("value2_doubled").sum()
    ])
)

# Lazy (ленивое) выполнение - операции оптимизируются перед выполнением
lazy_result = (
    large_df.lazy()  # переход в ленивый режим
    .filter(pl.col("value1") > 0)
    .with_columns([
        (pl.col("value2") * 2).alias("value2_doubled")
    ])
    .group_by("group")
    .agg([
        pl.col("value1").mean(),
        pl.col("value2_doubled").sum()
    ])
    .collect()  # выполнение всего pipeline
)

# Просмотр плана выполнения
query_plan = (
    large_df.lazy()
    .filter(pl.col("value1") > 0)
    .select([pl.col("group"), pl.col("value1")])
    .group_by("group")
    .agg(pl.col("value1").mean())
)

# Неоптимизированный план
print(query_plan.explain(optimized=False))

# Оптимизированный план
print(query_plan.explain(optimized=True))

# Параллельное выполнение нескольких агрегаций
parallel_agg = (
    large_df.lazy()
    .group_by("group")
    .agg([
        # Все эти операции выполняются параллельно
        pl.col("value1").mean().alias("v1_mean"),
        pl.col("value1").std().alias("v1_std"),
        pl.col("value2").min().alias("v2_min"),
        pl.col("value2").max().alias("v2_max"),
        pl.col("value3").sum().alias("v3_sum"),
        pl.count().alias("count")
    ])
    .collect()
)

Пункт 2: параллельная обработка — используем все ядра процессора

Polars автоматически распараллеливает операции, но понимание того, как это работает, поможет вам писать более эффективный код.

import os
# Проверяем количество доступных ядер
print(f"Доступно ядер процессора: {os.cpu_count()}")

# Создаём тестовые данные
n_rows = 10_000_000
df_parallel = pl.DataFrame({
    'group': np.random.choice(list('ABCDEFGH'), n_rows),  # 8 групп для 8 ядер
    'value': np.random.randn(n_rows),
    'amount': np.random.randint(1, 1000, n_rows)
})

# ХОРОШО: Операции, которые отлично параллелятся
# Каждое ядро может обработать свою часть данных независимо
parallel_friendly = (
    df_parallel.lazy()
    .with_columns([
        # Эти операции выполняются параллельно для разных частей данных
        pl.col('value').abs().alias('abs_value'),
        pl.col('value').pow(2).alias('squared'),
        (pl.col('value') * pl.col('amount')).alias('weighted'),
        pl.when(pl.col('value') > 0)
          .then(pl.lit('positive'))
          .otherwise(pl.lit('negative'))
          .alias('sign')
    ])
    .collect()
)

# Группировка - Polars автоматически распределяет группы между ядрами
grouped_parallel = (
    df_parallel.lazy()
    .group_by('group')
    .agg([
        # Каждое ядро может обрабатывать свои группы
        pl.col('value').mean(),
        pl.col('value').std(),
        pl.col('amount').sum()
    ])
    .collect()
)

# ВАЖНО: Понимание partition_by для явного управления параллелизмом
# Это полезно, когда у вас есть независимые группы данных
def process_partition(partition_df):
    """Функция обработки одной партиции"""
    return partition_df.with_columns([
        pl.col('value').rolling_mean(window_size=100).alias('rolling_avg'),
        pl.col('value').rank().alias('rank')
    ])

# Разбиваем данные на партиции и обрабатываем параллельно
partitions = df_parallel.partition_by('group', maintain_order=False)
print(f"Создано партиций: {len(partitions)}")

# Обрабатываем каждую партицию
# В реальности Polars делает это автоматически, но иногда явное разделение полезно
processed_partitions = [process_partition(p) for p in partitions]
result_partitioned = pl.concat(processed_partitions)

# Контроль параллелизма через streaming
# Для очень больших данных, которые не помещаются в память
streaming_query = (
    pl.scan_csv("huge_file.csv")  # Не загружает файл целиком
    .filter(pl.col('value') > 0)
    .group_by('category')
    .agg([
        pl.col('amount').sum(),
        pl.col('value').mean()
    ])
    # streaming=True позволяет обрабатывать данные батчами
    .collect(streaming=True)
)

# Мониторинг использования ресурсов
def measure_parallel_efficiency(df, operation):
    """Измеряем эффективность параллелизации"""
    import time
    import multiprocessing
    
    # Запускаем с разным количеством потоков
    thread_counts = [1, 2, 4, 8]
    times = []
    
    for n_threads in thread_counts:
        # Устанавливаем количество потоков для Polars
        os.environ['POLARS_MAX_THREADS'] = str(n_threads)
        
        start = time.time()
        result = operation(df)
        elapsed = time.time() - start
        times.append(elapsed)
        
        print(f"Потоков: {n_threads}, Время: {elapsed:.2f}s")
    
    # Восстанавливаем настройки по умолчанию
    del os.environ['POLARS_MAX_THREADS']
    
    # Рассчитываем эффективность распараллеливания
    speedup = [times[0] / t for t in times]
    efficiency = [s / threads for s, threads in zip(speedup, thread_counts)]
    
    for threads, eff in zip(thread_counts, efficiency):
        print(f"Эффективность с {threads} потоками: {eff:.1%}")

Пункт 3: эффективная работа с памятью

Управление памятью критически важно при работе с большими данными. Polars предоставляет несколько стратегий для минимизации использования памяти.

# Стратегия 1: Инкрементальная обработка больших файлов
def process_large_csv_incrementally(file_path, batch_size=100_000):
    """
    Обрабатываем большой CSV файл батчами, не загружая его целиком в память
    """
    
    # Используем lazy reading с батчированием
    lazy_df = pl.scan_csv(file_path)
    
    # Определяем обработку
    processed = (
        lazy_df
        .filter(pl.col('value') > 0)
        .group_by('category')
        .agg([
            pl.col('amount').sum().alias('total_amount'),
            pl.col('value').mean().alias('avg_value')
        ])
    )
    
    # collect(streaming=True) обрабатывает данные батчами
    result = processed.collect(streaming=True)
    return result

# Стратегия 2: Использование sink_parquet для записи больших результатов
def process_and_save_large_data():
    """
    Обрабатываем данные и сохраняем результат без загрузки всего в память
    """
    
    # sink_parquet записывает результат напрямую на диск
    (
        pl.scan_csv("huge_input.csv")
        .filter(pl.col('important') == True)
        .with_columns([
            pl.col('value').rolling_mean(100).alias('smoothed')
        ])
        .sink_parquet("processed_output.parquet")
    )
    # Данные никогда не загружаются полностью в память!

# Стратегия 3: Освобождение памяти после операций
def memory_efficient_pipeline():
    """
    Явное управление памятью в сложном pipeline
    """
    import gc
    
    # Шаг 1: Загрузка и первичная обработка
    df1 = pl.read_csv("data1.csv")
    processed1 = df1.filter(pl.col('value') > 0)
    del df1  # Освобождаем исходные данные
    gc.collect()  # Принудительная сборка мусора
    
    # Шаг 2: Вторая таблица
    df2 = pl.read_csv("data2.csv")
    processed2 = df2.group_by('key').agg(pl.col('amount').sum())
    del df2
    gc.collect()
    
    # Шаг 3: Соединение
    result = processed1.join(processed2, on='key')
    del processed1, processed2  # Освобождаем промежуточные результаты
    gc.collect()
    
    return result

# Стратегия 4: Мониторинг использования памяти
def monitor_memory_usage():
    """
    Отслеживаем использование памяти во время выполнения
    """
    import psutil
    import os
    
    process = psutil.Process(os.getpid())
    
    def get_memory_mb():
        return process.memory_info().rss / 1024 / 1024
    
    print(f"Начальная память: {get_memory_mb():.1f} MB")
    
    # Создаём большой DataFrame
    df = pl.DataFrame({
        'data': np.random.randn(10_000_000)
    })
    print(f"После создания DataFrame: {get_memory_mb():.1f} MB")
    
    # Операция, увеличивающая использование памяти
    df_expanded = df.with_columns([
        pl.col('data').pow(2).alias('squared'),
        pl.col('data').abs().alias('absolute'),
        pl.col('data').rolling_mean(100).alias('rolling')
    ])
    print(f"После расширения: {get_memory_mb():.1f} MB")
    
    # Освобождаем память
    del df
    gc.collect()
    print(f"После удаления исходного: {get_memory_mb():.1f} MB")
    
    return df_expanded

# Стратегия 5: Использование категориальных типов для строк
def optimize_string_memory():
    """
    Демонстрация экономии памяти с категориальными типами
    """
    n = 1_000_000
    
    # Много повторяющихся строк
    string_data = np.random.choice(
        ['Customer_' + str(i) for i in range(1000)],  # 1000 уникальных значений
        n
    )
    
    # Обычные строки
    df_strings = pl.DataFrame({
        'customer': string_data,
        'value': np.random.randn(n)
    })
    
    # Категориальные строки
    df_categorical = pl.DataFrame({
        'customer': pl.Series(string_data, dtype=pl.Categorical),
        'value': np.random.randn(n)
    })
    
    print(f"Размер с обычными строками: {df_strings.estimated_size('mb'):.2f} MB")
    print(f"Размер с категориальными: {df_categorical.estimated_size('mb'):.2f} MB")
    print(f"Экономия: {(1 - df_categorical.estimated_size() / df_strings.estimated_size()) * 100:.1f}%")
    
    # ВАЖНО: Операции с категориальными типами тоже быстрее!
    start = time.time()
    grouped_strings = df_strings.group_by('customer').agg(pl.col('value').sum())
    print(f"Группировка строк: {time.time() - start:.3f}s")
    
    start = time.time()
    grouped_categorical = df_categorical.group_by('customer').agg(pl.col('value').sum())
    print(f"Группировка категорий: {time.time() - start:.3f}s")

Пункт 4: оптимизация операций соединения (Joins)

Соединение таблиц — одна из самых затратных операций. Polars использует умные алгоритмы, но правильная подготовка данных может ускорить процесс в разы.

# Создаём таблицы для соединения
n_left = 5_000_000
n_right = 1_000_000

left_table = pl.DataFrame({
    'key': np.random.randint(0, 500_000, n_left),
    'value_left': np.random.randn(n_left),
    'category': np.random.choice(['A', 'B', 'C'], n_left)
})

right_table = pl.DataFrame({
    'key': np.random.randint(0, 500_000, n_right),
    'value_right': np.random.randn(n_right),
    'info': np.random.choice(['X', 'Y', 'Z'], n_right)
})

# СТРАТЕГИЯ 1: Предварительная сортировка для sort-merge join
# Если данные отсортированы, join выполняется намного быстрее
print("Сравнение стратегий соединения:")

# Несортированные данные
start = time.time()
unsorted_join = left_table.join(right_table, on='key', how='inner')
print(f"Join без сортировки: {time.time() - start:.2f}s")

# Предварительно отсортированные данные
start = time.time()
left_sorted = left_table.sort('key')
right_sorted = right_table.sort('key')
sorted_join = left_sorted.join(right_sorted, on='key', how='inner')
print(f"Join с сортировкой: {time.time() - start:.2f}s")

# СТРАТЕГИЯ 2: Уменьшение размера таблиц перед соединением
# Фильтруйте и выбирайте только нужные колонки ДО join
efficient_join = (
    left_table.lazy()
    .filter(pl.col('category') == 'A')  # Фильтр ДО join
    .select(['key', 'value_left'])  # Только нужные колонки
    .join(
        right_table.lazy()
        .filter(pl.col('info') != 'Z')  # Фильтр справа тоже
        .select(['key', 'value_right']),
        on='key',
        how='inner'
    )
    .collect()
)

# СТРАТЕГИЯ 3: Использование join_asof для временных рядов
# Для данных с временными метками join_asof намного эффективнее
trades = pl.DataFrame({
    'time': pl.date_range(
        datetime(2024, 1, 1), 
        datetime(2024, 1, 2), 
        interval='1s',
        eager=True
    )[:10000],
    'price': np.random.uniform(100, 200, 10000)
}).sort('time')

quotes = pl.DataFrame({
    'time': pl.date_range(
        datetime(2024, 1, 1), 
        datetime(2024, 1, 2), 
        interval='100ms',
        eager=True
    )[:100000],
    'bid': np.random.uniform(99, 199, 100000)
}).sort('time')

# join_asof находит ближайшее значение по времени
asof_result = trades.join_asof(
    quotes,
    on='time',
    strategy='backward'  # Берём последнюю котировку перед сделкой
)

# СТРАТЕГИЯ 4: Broadcast join для маленькой таблицы справа
# Если правая таблица маленькая, Polars может разослать её копии всем потокам
small_lookup = pl.DataFrame({
    'category_id': [1, 2, 3, 4, 5],
    'category_name': ['Electronics', 'Books', 'Clothing', 'Food', 'Sports']
})

large_transactions = pl.DataFrame({
    'transaction_id': range(1_000_000),
    'category_id': np.random.randint(1, 6, 1_000_000),
    'amount': np.random.uniform(10, 1000, 1_000_000)
})

# Polars автоматически определит, что small_lookup маленькая
# и использует broadcast join
broadcast_join = large_transactions.join(
    small_lookup,
    on='category_id',
    how='left'
)

# ПРОДВИНУТАЯ ОПТИМИЗАЦИЯ: Множественные join в правильном порядке
def optimize_multi_join():
    """
    При множественных join порядок имеет значение!
    Начинайте с самых селективных join (которые сильнее уменьшают данные)
    """
    
    # Создаём таблицы разного размера
    huge_table = pl.DataFrame({
        'id': range(10_000_000),
        'key1': np.random.randint(0, 1000, 10_000_000),
        'key2': np.random.randint(0, 100, 10_000_000),
        'value': np.random.randn(10_000_000)
    })
    
    medium_table = pl.DataFrame({
        'key1': range(1000),
        'info1': np.random.choice(['A', 'B', 'C'], 1000)
    })
    
    small_table = pl.DataFrame({
        'key2': range(100),
        'info2': np.random.choice(['X', 'Y'], 100)
    })
    
    # ПРАВИЛЬНЫЙ порядок: сначала join с маленькой таблицей
    efficient_order = (
        huge_table.lazy()
        .join(small_table.lazy(), on='key2', how='inner')  # Сильно уменьшает размер
        .join(medium_table.lazy(), on='key1', how='inner')  # Затем со средней
        .collect()
    )
    
    return efficient_order

Пункт 5: Оптимизация агрегаций и группировок

Группировка и агрегация — одни из самых "тяжелых" операций в преобразовании данных. Polars предоставляет множество способов сделать эти операции максимально эффективными. Ключ к успеху — понимание того, как Polars распределяет работу между потоками и как можно помочь ему в этом.

# Создаём данные для экспериментов
n = 10_000_000
df_agg = pl.DataFrame({
    'group_high_card': np.random.randint(0, 100_000, n),  # Много групп
    'group_low_card': np.random.choice(['A', 'B', 'C', 'D'], n),  # Мало групп
    'value1': np.random.randn(n),
    'value2': np.random.uniform(0, 100, n),
    'value3': np.random.randint(0, 1000, n)
})

# ПРИНЦИП 1: Минимизируйте количество проходов по данным
# ПЛОХО - множественные группировки
start = time.time()
result1 = df_agg.group_by('group_low_card').agg(pl.col('value1').mean())
result2 = df_agg.group_by('group_low_card').agg(pl.col('value2').sum())
result3 = df_agg.group_by('group_low_card').agg(pl.col('value3').max())
bad_result = result1.join(result2, on='group_low_card').join(result3, on='group_low_card')
print(f"Множественные группировки: {time.time() - start:.2f}s")

# ХОРОШО - одна группировка с множественными агрегациями
start = time.time()
good_result = df_agg.group_by('group_low_card').agg([
    pl.col('value1').mean(),
    pl.col('value2').sum(),
    pl.col('value3').max()
])
print(f"Одна группировка: {time.time() - start:.2f}s")

# ПРИНЦИП 2: Используйте эффективные агрегации
def compare_aggregation_methods():
    """
    Некоторые агрегации намного эффективнее других
    """
    
    # Эффективные агрегации (O(n))
    efficient_aggs = df_agg.group_by('group_low_card').agg([
        pl.col('value1').sum(),      # Простая сумма
        pl.col('value1').mean(),     # Среднее
        pl.col('value1').min(),      # Минимум
        pl.col('value1').max(),      # Максимум
        pl.col('value1').count(),    # Подсчёт
        pl.col('value1').first(),    # Первое значение
        pl.col('value1').last(),     # Последнее значение
    ])
    
    # Менее эффективные агрегации (требуют сортировки или хранения всех значений)
    expensive_aggs = df_agg.group_by('group_low_card').agg([
        pl.col('value1').median(),           # Требует сортировки
        pl.col('value1').quantile(0.75),     # Требует сортировки
        pl.col('value1').std(),              # Требует два прохода
        pl.col('value1').unique().len(),     # Требует хранения уникальных значений
        pl.col('value1').sort().list(),      # Создаёт список всех значений
    ])
    
    return efficient_aggs, expensive_aggs

# ПРИНЦИП 3: Предварительная фильтрация в агрегациях
# Вместо фильтрации после группировки, фильтруйте внутри агрегации
better_aggregation = df_agg.group_by('group_low_card').agg([
    # Условная агрегация - очень эффективно!
    pl.col('value1').filter(pl.col('value1') > 0).sum().alias('positive_sum'),
    pl.col('value1').filter(pl.col('value1') < 0).sum().alias('negative_sum'),
    
    # Подсчёт с условием
    (pl.col('value2') > 50).sum().alias('count_above_50'),
    
    # Взвешенное среднее
    (pl.col('value1') * pl.col('value3')).sum() / pl.col('value3').sum()
])

# ПРИНЦИП 4: Оптимизация группировки по высококардинальным колонкам
def optimize_high_cardinality_groupby():
    """
    Когда у вас много уникальных групп, важна стратегия
    """
    
    # Для высококардинальных групп используйте streaming
    high_card_result = (
        df_agg.lazy()
        .group_by('group_high_card')
        .agg([
            pl.col('value1').mean(),
            pl.col('value2').sum()
        ])
        .collect(streaming=True)  # Обрабатывает батчами
    )
    
    # Или разбивайте на части если нужна специальная обработка
    n_partitions = 10
    partition_size = 10_000
    
    results = []
    for i in range(n_partitions):
        mask = (pl.col('group_high_card') >= i * partition_size) & \
               (pl.col('group_high_card') < (i + 1) * partition_size)
        
        partial_result = (
            df_agg.lazy()
            .filter(mask)
            .group_by('group_high_card')
            .agg(pl.col('value1').mean())
            .collect()
        )
        results.append(partial_result)
    
    final_result = pl.concat(results)
    return final_result

# ПРИНЦИП 5: Использование rolling aggregations для временных рядов
time_series_df = pl.DataFrame({
    'timestamp': pl.date_range(
        datetime(2024, 1, 1),
        datetime(2024, 12, 31),
        interval='1h',
        eager=True
    ),
    'value': np.random.randn(8760),  # Часовые данные за год
    'category': np.random.choice(['A', 'B'], 8760)
})

# Эффективные скользящие агрегации
rolling_result = time_series_df.with_columns([
    # Фиксированное окно
    pl.col('value').rolling_mean(window_size=24).alias('daily_avg'),
    
    # Динамическое окно по времени
    pl.col('value').rolling_mean_by(
        'timestamp',
        window_size='7d',  # 7 дней
        closed='left'
    ).alias('weekly_avg')
])

# Группированные скользящие агрегации
grouped_rolling = time_series_df.with_columns([
    pl.col('value')
      .rolling_mean(window_size=24)
      .over('category')
      .alias('daily_avg_by_category')
])

Заключение

Polars представляет собой не просто "еще одну библиотеку для работы с данными", а полное переосмысление того, как должна работать обработка табличных данных в современном мире. Построенная с нуля с учетом современного железа, параллельных вычислений и оптимизации запросов, эта библиотека дает производительность, которая в разы, а иногда и в десятки раз превосходит традиционные инструменты.

Ключевые преимущества Polars можно суммировать следующим образом.

  • Производительность на уровне низкоуровневых языков благодаря Rust, но с удобством Python API.

  • Автоматическое использование всех ядер процессора без дополнительной настройки.

  • Возможность работы с данными, превышающими объем RAM, через streaming engine.

  • Мощный оптимизатор запросов, который автоматически выбирает наилучший план выполнения.

  • Полноценная поддержка SQL для плавного перехода от традиционных инструментов.

  • Строгая типизация и предсказуемое поведение, снижающие количество ошибок в продакшене.

Выбор между Polars и альтернативами зависит от конкретных задач и контекста. Для проектов, где производительность критична, где работа идет с датасетами от сотен мегабайт до сотен гигабайт, где важна эффективность использования ресурсов — Polars становится очевидным выбором.

Для небольших ad-hoc анализов, прототипирования или работы в экосистеме, тесно связанной с Pandas, традиционный подход может оставаться предпочтительным. Однако важно понимать, что Polars и Pandas не обязательно должны быть взаимоисключающими выборами. Можно использовать обе библиотеки: Polars для тяжелых ETL-процессов и обработки больших данных, Pandas для финальной аналитики и интеграции с инструментами визуализации и машинного обучения. Возможность легкой конвертации между форматами делает такой гибридный подход вполне жизнеспособным.

Будущее Polars выглядит многообещающе. С растущим сообществом разработчиков и постоянными улучшениями производительности, библиотека продолжает активно развиваться. Планируемый переход на streaming engine по умолчанию сделает работу с большими данными еще более эффективной. Развитие Polars Cloud обещает решить последнее ограничение — привязку к ресурсам одной машины. Улучшения в области машинного обучения и расширение функциональности делают Polars все более привлекательным выбором для современных data-интенсивных приложений.

Мы живем в эпоху, когда объемы данных растут экспоненциально, а традиционные инструменты достигли своих архитектурных пределов. Polars — это ответ на вызовы современной эры больших данных, и его популярность будет только расти. Если вы еще не пробовали Polars, самое время начать! Установка занимает секунды, синтаксис интуитивен, а прирост производительности может изменить ваше представление о том, что возможно в обработке данных на одной машине.

И финальная благодарность от автора: спасибо, что прочитали эту статью до конца!

А если моя статья вам понравилась, то рекомендую подписаться на мой совсем молодой, но будущий легендарный телеграм-канал "Данные по-большому" с подробными материалами по инженерии данных (и не только!), обзорами на дата-инженерные книги и новые технологии в области DE/DS/ML, новостями и мемами об айтишке и околоайтишке.


Приложение 1: все бенчмарки Polars против других популярных решений

Результаты PDS-H бенчмарка

Сравнение по производительности Polars и Pandas

Polars против DuckDB

Polars против PySpark

Еще бенчмарки

Приложение 2: книги и курсы для изучения

Книги

Курсы и туториалы

Примеры кода для продолжающих:

Комментарии (0)