Привет! В первой части я поделился мыслями, которые побудили к созданию python библиотеки convtools. Кратко о ней: предоставляет примитивы (конверсии), объединяя которые, можно описывать сложные конверсии для обработки данных. Конверсия генерирует узкоспециализированный код, компилирует его и возвращает функцию, решающую конкретную задачу.

В этот раз хотелось бы подробнее остановиться на двух моментах:

  • как pipe позволяет повысить переиспользуемость кода

  • новая часть библиотеки: Table - потоковый обработчик табличных данных

1) Pipes & code reuse

Надеюсь, что читать код с комментариями удобнее, чем оторванное от него косноязычное повествование, поэтому приступим:

from datetime import datetime
from decimal import Decimal

from convtools import conversion as c


# Допустим нам нужно прочесть некий отчет и поработать с ним.

# описываем конверсии, необходимые при чтении отчета
# сохраним то, как будут обрабатываться все входные строки
c_str = c.this().call_method("strip")
# то, как из обработанной строки получить Decimal (убиваем запятые -
# разделители групп разрядов)
c_decimal = c_str.call_method("replace", ",", "").as_type(Decimal)
c_parse_date = c.call_func(
    datetime.strptime, c.this(), "%Y-%m-%d"
).call_method("date")
c_date = c_str.pipe(c_parse_date)

c_optional_date = c_str.pipe(c.if_(c.this(), с_parse_date, None))
c_optional_decimal = c_str.pipe(c.if_(c.this(), c_decimal, None))

# разменяем входные строки на понятные типы через (тут должен быть
# Enum, но с диктом тоже красиво)
c_product_type = c.naive(
    {
        "phone": 1,
        "laptop": 2,
    }
).item(c_str.call_method("lower"))


# теперь определим, что нас может интересовать в данных, направляя
# каждое из полей в необходимую конверсию
schema = {
    "full_name": c.item("Full Name").pipe(c_str),
    "sku": c.item("SKU").pipe(c_str),
    "product_type": c.item("Product Type").pipe(c_product_type),
    "earnings": c.item("Earnings").pipe(c_decimal),
    "refunds": c.item("Refunds").pipe(c_optional_decimal),
    "date": c.item("Date").pipe(c_date),
    "date_of_birth": c.item("Date of Birth").pipe(c_optional_date),
}

# теперь можем забыть о том, как что и откуда берется.
# можно описать необходимую логику
converter = (
    # группируем по именам и датам
    c.group_by(schema["full_name"], schema["date"])
    .aggregate(
        {
            "full_name": schema["full_name"],
            "date": schema["date"],
            "date_of_birth": c.ReduceFuncs.First(schema["date_of_birth"]),
            "total_earnings": c.ReduceFuncs.Sum(schema["earnings"]),
        }
    )
    # отбросим "начинающих"
    .filter(c.item("total_earnings") > 1000)
    # сгруппируем по датам, соберем якобы полезную аналитику:
    #  - кол-во не "начинающих"
    #  - их средний заработок
    #  - их медианная дата рождения
    #  - dict: SKU -> сумма заработков (агрегация в агрегации)
    .pipe(
        c.group_by(c.item("date")).aggregate(
            {
                "date": c.item("date"),
                "number_of_workers": c.ReduceFuncs.Count(),
                "average_earnings": c.ReduceFuncs.Average(
                    c.item("total_earnings")
                ),
                "median_date_of_birth": c.ReduceFuncs.Median(
                    c.item("date_of_birth")
                ),
                "earnings_by_sku": c.ReduceFuncs.DictSum(
                    c.item("sku"),
                    c.item("total_earnings"),
                ),
            }
        )
    )
    # debug=True - выведет в stdout сгенерированный код.
    # если установлен black, он будет еще и отформатирован
    .gen_converter()
)

converter(input_data)
сгенерированный код
def pipe__cp(input__cp, _naive, _labels, _none):
    return (
        _naive["strptime_ox"](input__cp.strip(), _naive["v_7o"]).date()
        if input__cp
        else None
    )


class AggData__cg:
    __slots__ = ["v0", "v1"]

    def __init__(self, _none=__none__):
        self.v0 = _none
        self.v1 = _none


def group_by__cg(data_, _naive, _labels, _none):
    signature_to_agg_data__cg = defaultdict(AggData__cg)

    for row__cg in data_:
        agg_data__cg = signature_to_agg_data__cg[
            (
                row__cg["Full Name"].strip(),
                _naive["strptime_ox"](
                    row__cg["Date"].strip(), _naive["v_7o"]
                ).date(),
            )
        ]
        if agg_data__cg.v0 is _none:
            agg_data__cg.v0 = pipe__cp(
                row__cg["Date of Birth"].strip(), _naive, _labels, _none
            )
            agg_data__cg.v1 = (
                _naive["Decimal_4v"](
                    row__cg["Earnings"].strip().replace(",", "")
                )
                or 0
            )
        else:
            agg_data__cg.v1 = agg_data__cg.v1 + (
                _naive["Decimal_4v"](
                    row__cg["Earnings"].strip().replace(",", "")
                )
                or 0
            )

    result_ = (
        {
            "full_name": signature__cg[0],
            "date": signature__cg[1],
            "date_of_birth": (
                None if agg_data__cg.v0 is _none else agg_data__cg.v0
            ),
            "total_earnings": (
                0 if agg_data__cg.v1 is _none else agg_data__cg.v1
            ),
        }
        for signature__cg, agg_data__cg in signature_to_agg_data__cg.items()
    )
    filtered_result_ = [
        i_4z for i_4z in result_ if (i_4z["total_earnings"] > 1000)
    ]
    return filtered_result_


class AggData__gp:
    __slots__ = ["v0", "v1", "v2", "v3"]

    def __init__(self, _none=__none__):
        self.v0 = _none
        self.v1 = _none
        self.v2 = _none
        self.v3 = _none


def group_by__gp(data_, _naive, _labels, _none):
    signature_to_agg_data__gp = defaultdict(AggData__gp)

    for row__gp in data_:
        agg_data__gp = signature_to_agg_data__gp[row__gp["date"]]
        if agg_data__gp.v0 is _none:
            agg_data__gp.v0 = 1
            agg_data__gp.v2 = [row__gp["date_of_birth"]]
            agg_data__gp.v3 = _d = defaultdict(int)
            _d[row__gp["sku"]] = row__gp["total_earnings"] or 0
        else:
            agg_data__gp.v0 = agg_data__gp.v0 + 1
            agg_data__gp.v2.append(row__gp["date_of_birth"])
            agg_data__gp.v3[row__gp["sku"]] = agg_data__gp.v3[
                row__gp["sku"]
            ] + (row__gp["total_earnings"] or 0)
        if agg_data__gp.v1 is _none:
            if row__gp["total_earnings"] is not None:
                agg_data__gp.v1 = (1, row__gp["total_earnings"] * 1)
        else:
            if row__gp["total_earnings"] is not None:
                agg_data__gp.v1 = (
                    agg_data__gp.v1[0] + 1,
                    agg_data__gp.v1[1] + row__gp["total_earnings"] * 1,
                )

    return [
        {
            "date": signature__gp,
            "number_of_workers": (
                0 if agg_data__gp.v0 is _none else agg_data__gp.v0
            ),
            "average_earnings": (
                None
                if agg_data__gp.v1 is _none
                else (agg_data__gp.v1[1] / agg_data__gp.v1[0])
            ),
            "median_date_of_birth": (
                None
                if agg_data__gp.v2 is _none
                else _naive["median_59"](agg_data__gp.v2)
            ),
            "earnings_by_sku": (
                None if agg_data__gp.v3 is _none else (dict(agg_data__gp.v3))
            ),
        }
        for signature__gp, agg_data__gp in signature_to_agg_data__gp.items()
    ]


def converter_uk(data_):
    global __naive_values__, __none__
    _naive = __naive_values__
    _none = __none__
    _labels = {}
    return _naive["group_by__gp"](
        _naive["group_by__cg"](data_, _naive, _labels, _none),
        _naive,
        _labels,
        _none,
    )

Так как все конверсии являются выражениями, метод pipe позволяет комбинировать практически любые из них. pipe-ы разумно воспринимать как бесплатную с точки зрения производительности абстракцию, так как в случаях, когда не требуется добавление label-ов и входные данные не используются более одного раза, pipe не приводит к дополнительному вызову функции.

2) Table - потоковая обработка табличных данных

Время от времени у меня появлялась необходимость в работе с табличными данными (или схожими с ними). И меня всегда прельщала мысль "запечь" в код индексы колонок, чтобы обращения к ним были дешевыми. И вот, наконец, имея под рукой примитивы convtools, получилось реализовать задуманное, совершая малое количество телодвижений.

Логика работы: Table работает с итераторами лениво, вычитывая только первую строку, в которой может содержаться заголовок. Далее, опираясь на заголовок и запрошенные изменения, пишется код, который выполнит нужные преобразования над имеющимся итератором. Методы into_iter_rows и into_csv являются конечными.

from convtools.contrib.tables import Table
from convtools import conversion as c

(
    Table.from_csv(
        "tests/csvs/ac.csv",
        header=True,
        dialect=Table.csv_dialect(delimiter="\t"),
    )
    # возьмем только колонки "a" и "c"
    .take("a", "c")
    # добавим две новых вычисляемых колонки
    .update(B=c.col("a") + c.col("c"), D=c.call_func(abs, c.col("c")))
    # переименуем
    .rename({"a": "A"})
    # отфильтруем строки
    .filter(c.col("c") < 10)
    # отбросим колонку "c"
    .drop("c").into_csv("tests/csvs/out.csv")
)


# объединим две таблицы по колонке "a"
list(
    Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
    .join(
        Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
        how="inner",
        on=["a"],
    )
    .into_iter_rows(dict)  # поддерживаются list/tuple
)

Также добавлены два изредка полезных метода:

"""
Table 1      Table 2
| a | b |    | b | c |
| 1 | 2 |    | 3 | 4 |

>>> table1.chain(table2, fill_value=" ")

Result:
| a | b | c |
| 1 | 2 |   |
|   | 3 | 4 |
"""
"""
Table 1      Table 2
| a | b |    | b | c |
| 1 | 2 |    | 3 | 4 |
             | 5 | 6 |
>>> table1.zip(table2, fill_value=" ")

Result:
| a | b | b | c |
| 1 | 2 | 3 | 4 |
|   |   | 5 | 6 |
"""

Заключение

Конечно, я не питаю иллюзий, что все резко бросятся использовать convtools, но всё равно захотелось поделиться. И даже не потому, что codegen добавляет гибкость, просто меня написание кода в таком стиле ставит на другие рельсы. С таким подходом я допускаю значительно меньше ошибок, становятся более очевидными стадии обработки данных, облегчается тестирование кода, т.к. работа ведется с чистыми "почти-функциями" + convtools поощряет работу с iterator-ами, что положительно сказывается на потребляемой памяти.

Всем успехов и, если что-нибудь придумаю или подскажете, до скорого!

Комментарии (2)


  1. eigrad
    18.10.2021 13:20

    Чтобы это не было отвратительно медленно в конечном итоге всё равно придется завести какую-нибудь громоздкую dask/spark/flink/datafusion.


    1. westandskif Автор
      18.10.2021 19:02

      Если были проведены какие-то тесты, то приложите, пожалуйста.

      Т.к. своё тестирование на скорость я не считаю сколько бы то нибыло полным, я его не приводил. Но в качестве ответа на комментарий приведу пример:

      На Python 3.6.12 клеил 2 одинаковых CSV файла размером 1.3МБ, 7.7к строк каждый по значениям в 4 колонках.
      На выходе получался CSV файл размером 143МБ. Сравнивались результаты, всё сошлось.
      Далее я исключил запись на диск и сравнил скорость:
      - Table took 1.9088249206542969 s
      - Pandas took 3.9153363704681396 s
      - Polars took 2.842676877975464 s

      Поэтому "отвратительно медленно" это скорее открытый вопрос, на самом деле нужно проверять насколько :) Все зависит от условий.

      P.S. я не предлагаю обрабатывать тонны данных, скорее предлагаю иметь в виду такой инструмент, т.к. простота конечного кода порой выливается в приемлемой скорости.
      Ну и неплохой бонус: если через Table вычитать файл и сложить обратно на диск, можно верить, что файл не изменился.
      С pandas нужно поднапрячься, чтобы файл не мутировал.

      P.S.2 Если задача в анализе сотен гигов и более, то можно вгрузить в Clickhouse и там удобно анализировать