Привет! В первой части я поделился мыслями, которые побудили к созданию python библиотеки convtools. Кратко о ней: предоставляет примитивы (конверсии), объединяя которые, можно описывать сложные конверсии для обработки данных. Конверсия генерирует узкоспециализированный код, компилирует его и возвращает функцию, решающую конкретную задачу.
В этот раз хотелось бы подробнее остановиться на двух моментах:
как
pipe
позволяет повысить переиспользуемость кодановая часть библиотеки:
Table
- потоковый обработчик табличных данных
1) Pipes & code reuse
Надеюсь, что читать код с комментариями удобнее, чем оторванное от него косноязычное повествование, поэтому приступим:
from datetime import datetime
from decimal import Decimal
from convtools import conversion as c
# Допустим нам нужно прочесть некий отчет и поработать с ним.
# описываем конверсии, необходимые при чтении отчета
# сохраним то, как будут обрабатываться все входные строки
c_str = c.this().call_method("strip")
# то, как из обработанной строки получить Decimal (убиваем запятые -
# разделители групп разрядов)
c_decimal = c_str.call_method("replace", ",", "").as_type(Decimal)
c_parse_date = c.call_func(
datetime.strptime, c.this(), "%Y-%m-%d"
).call_method("date")
c_date = c_str.pipe(c_parse_date)
c_optional_date = c_str.pipe(c.if_(c.this(), с_parse_date, None))
c_optional_decimal = c_str.pipe(c.if_(c.this(), c_decimal, None))
# разменяем входные строки на понятные типы через (тут должен быть
# Enum, но с диктом тоже красиво)
c_product_type = c.naive(
{
"phone": 1,
"laptop": 2,
}
).item(c_str.call_method("lower"))
# теперь определим, что нас может интересовать в данных, направляя
# каждое из полей в необходимую конверсию
schema = {
"full_name": c.item("Full Name").pipe(c_str),
"sku": c.item("SKU").pipe(c_str),
"product_type": c.item("Product Type").pipe(c_product_type),
"earnings": c.item("Earnings").pipe(c_decimal),
"refunds": c.item("Refunds").pipe(c_optional_decimal),
"date": c.item("Date").pipe(c_date),
"date_of_birth": c.item("Date of Birth").pipe(c_optional_date),
}
# теперь можем забыть о том, как что и откуда берется.
# можно описать необходимую логику
converter = (
# группируем по именам и датам
c.group_by(schema["full_name"], schema["date"])
.aggregate(
{
"full_name": schema["full_name"],
"date": schema["date"],
"date_of_birth": c.ReduceFuncs.First(schema["date_of_birth"]),
"total_earnings": c.ReduceFuncs.Sum(schema["earnings"]),
}
)
# отбросим "начинающих"
.filter(c.item("total_earnings") > 1000)
# сгруппируем по датам, соберем якобы полезную аналитику:
# - кол-во не "начинающих"
# - их средний заработок
# - их медианная дата рождения
# - dict: SKU -> сумма заработков (агрегация в агрегации)
.pipe(
c.group_by(c.item("date")).aggregate(
{
"date": c.item("date"),
"number_of_workers": c.ReduceFuncs.Count(),
"average_earnings": c.ReduceFuncs.Average(
c.item("total_earnings")
),
"median_date_of_birth": c.ReduceFuncs.Median(
c.item("date_of_birth")
),
"earnings_by_sku": c.ReduceFuncs.DictSum(
c.item("sku"),
c.item("total_earnings"),
),
}
)
)
# debug=True - выведет в stdout сгенерированный код.
# если установлен black, он будет еще и отформатирован
.gen_converter()
)
converter(input_data)
сгенерированный код
def pipe__cp(input__cp, _naive, _labels, _none):
return (
_naive["strptime_ox"](input__cp.strip(), _naive["v_7o"]).date()
if input__cp
else None
)
class AggData__cg:
__slots__ = ["v0", "v1"]
def __init__(self, _none=__none__):
self.v0 = _none
self.v1 = _none
def group_by__cg(data_, _naive, _labels, _none):
signature_to_agg_data__cg = defaultdict(AggData__cg)
for row__cg in data_:
agg_data__cg = signature_to_agg_data__cg[
(
row__cg["Full Name"].strip(),
_naive["strptime_ox"](
row__cg["Date"].strip(), _naive["v_7o"]
).date(),
)
]
if agg_data__cg.v0 is _none:
agg_data__cg.v0 = pipe__cp(
row__cg["Date of Birth"].strip(), _naive, _labels, _none
)
agg_data__cg.v1 = (
_naive["Decimal_4v"](
row__cg["Earnings"].strip().replace(",", "")
)
or 0
)
else:
agg_data__cg.v1 = agg_data__cg.v1 + (
_naive["Decimal_4v"](
row__cg["Earnings"].strip().replace(",", "")
)
or 0
)
result_ = (
{
"full_name": signature__cg[0],
"date": signature__cg[1],
"date_of_birth": (
None if agg_data__cg.v0 is _none else agg_data__cg.v0
),
"total_earnings": (
0 if agg_data__cg.v1 is _none else agg_data__cg.v1
),
}
for signature__cg, agg_data__cg in signature_to_agg_data__cg.items()
)
filtered_result_ = [
i_4z for i_4z in result_ if (i_4z["total_earnings"] > 1000)
]
return filtered_result_
class AggData__gp:
__slots__ = ["v0", "v1", "v2", "v3"]
def __init__(self, _none=__none__):
self.v0 = _none
self.v1 = _none
self.v2 = _none
self.v3 = _none
def group_by__gp(data_, _naive, _labels, _none):
signature_to_agg_data__gp = defaultdict(AggData__gp)
for row__gp in data_:
agg_data__gp = signature_to_agg_data__gp[row__gp["date"]]
if agg_data__gp.v0 is _none:
agg_data__gp.v0 = 1
agg_data__gp.v2 = [row__gp["date_of_birth"]]
agg_data__gp.v3 = _d = defaultdict(int)
_d[row__gp["sku"]] = row__gp["total_earnings"] or 0
else:
agg_data__gp.v0 = agg_data__gp.v0 + 1
agg_data__gp.v2.append(row__gp["date_of_birth"])
agg_data__gp.v3[row__gp["sku"]] = agg_data__gp.v3[
row__gp["sku"]
] + (row__gp["total_earnings"] or 0)
if agg_data__gp.v1 is _none:
if row__gp["total_earnings"] is not None:
agg_data__gp.v1 = (1, row__gp["total_earnings"] * 1)
else:
if row__gp["total_earnings"] is not None:
agg_data__gp.v1 = (
agg_data__gp.v1[0] + 1,
agg_data__gp.v1[1] + row__gp["total_earnings"] * 1,
)
return [
{
"date": signature__gp,
"number_of_workers": (
0 if agg_data__gp.v0 is _none else agg_data__gp.v0
),
"average_earnings": (
None
if agg_data__gp.v1 is _none
else (agg_data__gp.v1[1] / agg_data__gp.v1[0])
),
"median_date_of_birth": (
None
if agg_data__gp.v2 is _none
else _naive["median_59"](agg_data__gp.v2)
),
"earnings_by_sku": (
None if agg_data__gp.v3 is _none else (dict(agg_data__gp.v3))
),
}
for signature__gp, agg_data__gp in signature_to_agg_data__gp.items()
]
def converter_uk(data_):
global __naive_values__, __none__
_naive = __naive_values__
_none = __none__
_labels = {}
return _naive["group_by__gp"](
_naive["group_by__cg"](data_, _naive, _labels, _none),
_naive,
_labels,
_none,
)
Так как все конверсии являются выражениями, метод pipe
позволяет комбинировать практически любые из них. pipe
-ы разумно воспринимать как бесплатную с точки зрения производительности абстракцию, так как в случаях, когда не требуется добавление label
-ов и входные данные не используются более одного раза, pipe
не приводит к дополнительному вызову функции.
2) Table - потоковая обработка табличных данных
Время от времени у меня появлялась необходимость в работе с табличными данными (или схожими с ними). И меня всегда прельщала мысль "запечь" в код индексы колонок, чтобы обращения к ним были дешевыми. И вот, наконец, имея под рукой примитивы convtools, получилось реализовать задуманное, совершая малое количество телодвижений.
Логика работы: Table
работает с итераторами лениво, вычитывая только первую строку, в которой может содержаться заголовок. Далее, опираясь на заголовок и запрошенные изменения, пишется код, который выполнит нужные преобразования над имеющимся итератором. Методы into_iter_rows
и into_csv
являются конечными.
from convtools.contrib.tables import Table
from convtools import conversion as c
(
Table.from_csv(
"tests/csvs/ac.csv",
header=True,
dialect=Table.csv_dialect(delimiter="\t"),
)
# возьмем только колонки "a" и "c"
.take("a", "c")
# добавим две новых вычисляемых колонки
.update(B=c.col("a") + c.col("c"), D=c.call_func(abs, c.col("c")))
# переименуем
.rename({"a": "A"})
# отфильтруем строки
.filter(c.col("c") < 10)
# отбросим колонку "c"
.drop("c").into_csv("tests/csvs/out.csv")
)
# объединим две таблицы по колонке "a"
list(
Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
.join(
Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
how="inner",
on=["a"],
)
.into_iter_rows(dict) # поддерживаются list/tuple
)
Также добавлены два изредка полезных метода:
"""
Table 1 Table 2
| a | b | | b | c |
| 1 | 2 | | 3 | 4 |
>>> table1.chain(table2, fill_value=" ")
Result:
| a | b | c |
| 1 | 2 | |
| | 3 | 4 |
"""
"""
Table 1 Table 2
| a | b | | b | c |
| 1 | 2 | | 3 | 4 |
| 5 | 6 |
>>> table1.zip(table2, fill_value=" ")
Result:
| a | b | b | c |
| 1 | 2 | 3 | 4 |
| | | 5 | 6 |
"""
Заключение
Конечно, я не питаю иллюзий, что все резко бросятся использовать convtools, но всё равно захотелось поделиться. И даже не потому, что codegen добавляет гибкость, просто меня написание кода в таком стиле ставит на другие рельсы. С таким подходом я допускаю значительно меньше ошибок, становятся более очевидными стадии обработки данных, облегчается тестирование кода, т.к. работа ведется с чистыми "почти-функциями" + convtools поощряет работу с iterator-ами, что положительно сказывается на потребляемой памяти.
Всем успехов и, если что-нибудь придумаю или подскажете, до скорого!
eigrad
Чтобы это не было отвратительно медленно в конечном итоге всё равно придется завести какую-нибудь громоздкую dask/spark/flink/datafusion.
westandskif Автор
Если были проведены какие-то тесты, то приложите, пожалуйста.
Т.к. своё тестирование на скорость я не считаю сколько бы то нибыло полным, я его не приводил. Но в качестве ответа на комментарий приведу пример:
На Python 3.6.12 клеил 2 одинаковых CSV файла размером 1.3МБ, 7.7к строк каждый по значениям в 4 колонках.
На выходе получался CSV файл размером 143МБ. Сравнивались результаты, всё сошлось.
Далее я исключил запись на диск и сравнил скорость:
- Table took 1.9088249206542969 s
- Pandas took 3.9153363704681396 s
- Polars took 2.842676877975464 s
Поэтому "отвратительно медленно" это скорее открытый вопрос, на самом деле нужно проверять насколько :) Все зависит от условий.
P.S. я не предлагаю обрабатывать тонны данных, скорее предлагаю иметь в виду такой инструмент, т.к. простота конечного кода порой выливается в приемлемой скорости.
Ну и неплохой бонус: если через
Table
вычитать файл и сложить обратно на диск, можно верить, что файл не изменился.С
pandas
нужно поднапрячься, чтобы файл не мутировал.P.S.2 Если задача в анализе сотен гигов и более, то можно вгрузить в Clickhouse и там удобно анализировать