Всем привет!

Обработка и анализ временных последовательностей (временных рядов) достаточно часто встречающаяся задача. Обычно она решается с помощью идентичных подходов и методов. Однако когда анализ временного ряда предполагает выражение каждого последующего элемента через предыдущие, возникают проблемы с эффективностью реализации такого анализа. Это особенно актуально в контексте больших данных.

Данная статья демонстрирует подход к анализу и вычислению рекуррентных соотношений. В качестве примера я представлю реализацию на базе Apache Spark и Python метода экспоненциальной скользящей средней с использованием DataFrame API. Мы также рассмотрим метод агрегации данных, совместимый со Spark Connect, который был добавлен в версию 3.1 (для Scala - начиная с версии фреймворка 3.0), а именно – функцию aggregate.

Функция aggregate

Функция pyspark.sql.functions.aggregate (далее просто aggregate) создавалась для обработки массивных типов данных в наборах DataFrame. У неё есть несколько входных параметров:

  • col – имя или объект колонки, для которой функция была вызвана;

  • initalValue – стартовое значение для расчёта;

  • merge – функция слияния данных, принимает на вход два параметра типа Column, где первый параметр – это аккумулированное значение, а второй параметр – это текущее значение из массива;

  • finish – необязательная функция, выполняемая для конечного аккумулированного значения.

Согласно документации для фреймворка Spark версии 3.5, функция aggregate не подразумевает встраивания каких-либо вызовов функций-udf на Python в функцию merge, но разрешает это на Scala. А также, начиная с версии 3.5, для неё существует функция псевдоним с именем pyspark.sql.functions.reduce.

В качестве примера в документации приводится сценарий вычисления среднего значения элементов массива чисел [20.0, 4.0, 2.0, 6.0, 10.0], результатом расчёта которого будет 8.4:

df = spark.createDataFrame([(1, [20.0, 4.0, 2.0, 6.0, 10.0])], ("id", "values"))
def merge(acc, x):
    count = acc.count + 1
    sum = acc.sum + x
    return struct(count.alias("count"), sum.alias("sum"))
df.select(
    aggregate(
        "values",
        struct(lit(0).alias("count"), lit(0.0).alias("sum")),
        merge,
        lambda acc: acc.sum / acc.count,
    ).alias("mean")
).show()

Рассмотрим пример подробнее. В качестве значения col было использовано имя колонки, содержащей массив чисел, а в качестве стартового значения была передана композиция двух значений – count и sum, равные нулю.

Функция merge передаётся по имени и вызывается лишь один раз в момент формирования вычислительного плана. Сама функция инкапсулирует логику, встраиваемый сегмент в вычислительный план. Далее данная логика будет использована Spark’ом для вычисления последующих значений для каждого элемента массива.

Иначе говоря, первое значение данного объекта было задано как struct(lit(0).alias("count"), lit(0.0).alias("sum")), в дальнейшем оно было передано в качестве аргумента параметру с именем acc функции merge. Переменная x функции merge представляет собой отображение элемента из массива и имеет также тип Column, а план расчёта нового значения представлен композицией объектов вида struct(count.alias("count"), sum.alias("sum")).

Последний аргумент функции aggregate представлен в виде лямбда функции и производит действие с объектом Column, в данном случае после прохода всех элементов в массиве выполняет деление суммы на число элементов.

Пример вычислительного плана для Spark версии 3.5.4:
== Physical Plan ==
Project [aggregate(values#194, [0,0.0], lambdafunction(struct(count, (lambda x_0#202.count + 1), sum, (lambda x_0#202.sum + lambda y_1#203)), lambda x_0#202, lambda y_1#203, false), lambdafunction((lambda x_2#204.sum / cast(lambda x_2#204.count as double)), lambda x_2#204, false)) AS mean#201]
+- *(1) Scan ExistingRDD[id#193L,values#194]

Принцип работы функции aggregate крайне схож с принципом работы функции свёртки, и ближайшим её аналогом будет функция foldLeft из языка Scala или функция reduce из языка Python.

Если опустить финальный этап расчёта среднего значения и объединить каждый вычисленный элемент массива с предыдущими в новый массив, то таким образом можно сформировать новую последовательность данных. Данный подход может быть полезен для расчёта рекуррентных соотношений.

Применим данную гипотезу к примеру расчёта методом экспоненциальной средней.

Метод экспоненциальной скользящей средней (EMA)

Метод скользящего среднего достаточно широко используется в экономике и в других предметных областях для сглаживания временных рядов с целью исключения случайных событий. В частности, можно найти утверждение, что данный метод в вопросах экономики помогает проводить оценки ВВП, показателей занятости и других макроэкономических индикаторов (например, рынок ценных бумаг).

Существует несколько разновидностей данного метода. Примеры наиболее распространённых:

  • простое скользящее окно - (SMA - simple moving average);

  • взвешенное скользящее окно - (WMA - weighted moving average);

  • экспоненциальная скользящая средняя - (EMA - exponential moving average).

Есть также и иные более специфичные разновидности, но в данной статье будет показано применение такого метода как EMA. В первую очередь из-за его популярности при анализе стоимости ценных бумаг.

Перед тем как перейдем непосредственно к реализации метода EMA на PySpark, детальнее рассмотрим сам метод и его алгебраическое представление. Стоит отметить, что знать, как реализуются и работают другие разновидности метода скользящей средней для понимания работы EMA не нужно.

Метод EMA представляется в виде линейного алгебраического выражения:

EMA_{n}=EMA_{n-1}+\alpha *\left(data_{n}-EMA_{n-1}\right)

Где:

  • EMA_{n-1}— это предыдущее вычисленное значение;

  • data_{n}— текущие данные(значение);

  •  \alpha— сглаживающий коэффициент от 0 до 1 вычисляемый по формуле \frac{2}{n+1};

  • n— количество значений за период.

Первое значение EMA вычисляется как среднее значение за некоторый период, определяемый переменной n:

EMA_{0}=\frac{data_{1}+data_{2}+data_{3}+...+data_{n}}{n}

Определив алгебраическую составляющую, сразу можем определить для себя одну важную проблему: каждое следующее значение зависит от предыдущего рассчитанного. Это значит, что перед нами рекуррентное соотношение.

Завершив знакомство с методом EMA, рассмотрим способ его реализации на PySpark.

Реализация метода EMA

Данные для текущего примера были взяты с портала Kaggle. В наборе данных представлены колебания стоимости ценных бумаг для американских корпораций, таких как Apple, Adobe, AMD, Google, Intel, Microsoft и Nvidia. Набор данных представляет собой следующие значения:

  • Date — дата торгов;

  • Open — цена за акцию в начале торгов;

  • High — наивысшая цена за акцию;

  • Low — самая низкая цена за акцию;

  • Close — цена на момент закрытия торгов (итоговый показатель);

  • Volume — показатель среднего объёма (оборот за одну торговую сессию);

  • OpenInt — открытый интерес (в данном наборе данных он нулевой и не участвует в анализе).

Из всех колонок, представленных в текущих наборах данных, нас интересуют только колонка Date (дата торгов) и Close (итоговый показатель цены за акцию).

Каждый файл в наборе данных содержит в себе информацию по одной корпорации и носит в себе сокращённое наименование, по которому уникальным образом идентифицируется компания на рынке акций. Эту информацию мы будет использовать для обогащения данных уникальным ключом. Так мы будем определять, для какой из компаний мы строим анализ методом EMA.

Временные промежутки, в рамках которых представлены данные по компаниям, имеют разное начало, но все они оканчиваются 2017 годом. Ниже представлен код подготовки данных для дальнейшего анализа.

stocks_schema = StructType([
    StructField('Date', TimestampType()),
    StructField('Open', FloatType()),
    StructField('High', FloatType()),
    StructField('Low', FloatType()),
    StructField('Close', FloatType()),
    StructField('Volume', IntegerType()),
    StructField('OpenInt', FloatType()),
])
stocks_df = (
    spark.read
    .csv('./data/stocks', sep=',', inferSchema=True, 
         header=True, dateFormat='yyyy-MM-dd')
    .where(f.col('close').isNotNull() & (f.col('close') > 0))
    .where(f.col('date') >= f.to_timestamp(f.lit('2017-01-01')))
    .withColumn('stock_comp_name', 
                f.element_at(f.split(f.input_file_name(), '/'), -1))
    .dropna()
)
company_names = ['a.us.txt', 'adbe.us.txt', 'amd.us.txt',
                 'goog.us.txt', 'intc.us.txt', 'msft.us.txt', 'nvda.us.txt']
comp_df_list = [stocks_df.where(f.col('stock_comp_name') == comp_name) for comp_name in company_names]
stocks_df = reduce(lambda x, y: x.unionByName(y), comp_df_list)

Завершив предварительную подготовку данных, перейдём к реализации метода EMA с помощью функции aggregate. Возьмём за пример данные для компании Adobe с 2017 года и самостоятельно рассчитаем в качестве примера первое значение для 5-го периода (в данной статье не рассматриваются рекомендации по выбору первого периода) и для двух последующих:

Date

Open

High

Low

Close

Volume

OpenInt

1

2017-01-03

103.43

104.03

102.81

103.48

2244021

0

2

2017-01-04

103.74

104.37

103.495

104.14

1653606

0

3

2017-01-05

104.13

106.02

103.77

105.91

2423943

0

4

2017-01-06

105.98

108.43

105.25

108.3

2586821

0

5

2017-01-09

107.96

108.79

107.56

108.57

2702977

0

6

2017-01-10

108.57

108.79

107.61

108.26

2957500

0

7

2017-01-11

108.23

109.05

108

108.99

1637329

0

8

2017-01-12

107.99

108.74

107.16

108.59

1591516

0

9

2017-01-13

108.84

108.9

107.98

108.53

1531230

0

10

2017-01-17

107.79

108.05

107.06

108

1667538

0

11

2017-01-18

108.5

109.17

107.53

108.79

2480734

0

12

2017-01-19

108.63

109.98

108.04

109.79

3007274

0

13

2017-01-20

110.02

110.81

109.57

110.71

3079168

0

14

2017-01-23

110.71

111.92

110.3

110.98

2404494

0

15

2017-01-24

111.46

114.17

111.34

113.72

3545803

0

16

2017-01-25

113.77

114.57

113.29

114.26

3729748

0

17

2017-01-26

113.79

114.44

112.73

112.88

2703751

0

18

2017-01-27

113.07

114.01

112.75

113.99

1663256

0

19

2017-01-30

113.22

113.87

112.26

113.82

2104222

0

20

2017-01-31

113.22

113.75

112.695

113.38

1568978

0

Сначала необходимо вычислить среднее значение для первого периода расчёта (в нашем случае это пятый период), а затем, используя предыдущее вычисленное значение, получить последующие:

  • Для первых пяти периодов мы получим среднее значение равное 106.08 которое мы рассчитали по формуле:

    \frac{(103.48+104.14+105.91+108.3+108.57)}{5};

  • Рассчитаем коэффициент alpha, необходимый для последующих расчётов, а именно alpha=2/\left(n+1\right)=0.33где n – это количество периодов равное значению 5;

  • Используя ранее полученное значение для пятого периода и рассчитанный коэффициент alpha, мы получим значение для шестого периода:106.08+0.33*\left(108.26–106.08\right)=106.8;

  • Дальнейший расчёт для седьмого периода будет содержать в себе предыдущее значение шестого периода, а именно: 106.8+0.33*\left(108.99–106.8\right)=107.5и т. д.

Ниже приведён полный расчёт первых 20 значений для компании Adobe, а также график сравнения фактических (close) и предсказанных (predict) значений для демонстрации сглаживающего характера метода EMA.

Date

stock_comp_name

close

predict

1

2017-01-03

adbe.us.txt

103.48

NULL

2

2017-01-04

adbe.us.txt

104.14

NULL

3

2017-01-05

adbe.us.txt

105.91

NULL

4

2017-01-06

adbe.us.txt

108.3

NULL

5

2017-01-09

adbe.us.txt

108.57

106.08

6

2017-01-10

adbe.us.txt

108.26

106.80666666666667

7

2017-01-11

adbe.us.txt

108.99

107.53444444444445

8

2017-01-12

adbe.us.txt

108.59

107.8862962962963

9

2017-01-13

adbe.us.txt

108.53

108.10086419753087

10

2017-01-17

adbe.us.txt

108.0

108.06724279835392

11

2017-01-18

adbe.us.txt

108.79

108.30816186556928

12

2017-01-19

adbe.us.txt

109.79

108.80210791037952

13

2017-01-20

adbe.us.txt

110.71

109.43807194025301

14

2017-01-23

adbe.us.txt

110.98

109.95204796016867

15

2017-01-24

adbe.us.txt

113.72

111.20803197344578

16

2017-01-25

adbe.us.txt

114.26

112.22535464896386

17

2017-01-26

adbe.us.txt

112.88

112.44356976597591

18

2017-01-27

adbe.us.txt

113.99

112.95904651065061

19

2017-01-30

adbe.us.txt

113.82

113.2460310071004

20

2017-01-31

adbe.us.txt

113.38

113.29068733806693

График демонстрирующий исходные значения (close) и предсказанные (predict)
График демонстрирующий исходные значения (close) и предсказанные (predict)

Теперь рассмотрим реализацию этого метода на pySpark с использованием функции aggregate. Сначала создадим класс, который будет описывать сам метод:

class EMA:
    def __init__(self, period: int, target: str, result: str, groupby: List[str], sortby: List[str]):
        self.__period: int = period
        self.__a: float = 2 / (self.__period + 1)
        self.__target: str = target
        self.__result: str = result
        self.__groupby: List[str] = groupby
        self.__sortby: List[str] = sortby

Для класса EMA были определены следующие его параметры (поля):

  • period – период или n, для которого будет вычислено первое значение;

  • target – колонка, содержащая целевое значение для анализа;

  • result – имя итоговой результирующей колонки с рассчитанными значениями (не подразумевается, что она существует в исходном наборе данных);

  • groupby – список колонок, по которым будет выполнена агрегация. В данном случае это одна колонка stock_comp_name, содержащая в себе название организации;

  • sortby – колонки, по которым будет выполнена сортировка исходных значений по выбранным ключам;

  • alpha (или просто “a”) будет вычислено по выше определённому правилу.

Логику расчёта экспоненциальной средней инкапсулируем в функции fit_predict, как представлено на примере ниже:

def fit_predict(self, input_df: DataFrame) -> DataFrame:
    input_df = (
        input_df
        .withColumn('i', f.lit(0).cast(IntegerType()))
        .withColumn(self.__result, f.lit(None).cast(DoubleType()))
    )

    input_cols = [*self.__sortby, *self.__groupby, self.__target]
    period = self.__period - 1
    alpha = self.__a

    def model(accum: Column, cur_item: Column):
        prev_item = f.element_at(accum, -1)
        cur_i = prev_item['i'] + 1
        cur_result = (
            f.when(cur_i < period, prev_item[self.__result] + cur_item[self.__target])
            .when(cur_i == period, (prev_item[self.__result] + cur_item[self.__target]) / (self.__period))
            .otherwise(prev_item[self.__result] + alpha * (cur_item[self.__target] - prev_item[self.__result]))
        )
        main_cols = [cur_item[col].alias(col) for col in input_cols]
        new_record = f.array(f.struct(*main_cols, cur_i.alias('i'), cur_result.alias(self.__result)))
        return f.array_union(accum, new_record)

    initial_struct_cols = [f.col('partition_array').getItem(0)[col].alias(col) for col in [*input_cols, 'i']]
    return (
        input_df
        .select(*input_cols, 'i', self.__result)
        .groupBy(*self.__groupby)
        .agg(
            f.sort_array(
                f.collect_list(f.struct(*input_cols, 'i', self.__result))
            ).alias('partition_array')
        )
        .withColumn('new_partitioned_array', f.aggregate(
            f.slice('partition_array', 2, f.size('partition_array')),
            f.array(f.struct(*initial_struct_cols, f.col('partition_array').getItem(0)[self.__target].alias(self.__result))),
            model
            )
        ).select(f.inline('new_partitioned_array'))
    ).withColumn(self.__result, f.when(f.col('i') < period, None).otherwise(f.col(self.__result))).drop('i')

Подробнее рассмотрим данную функцию.

  input_df = (
      input_df
      .withColumn('i', f.lit(0).cast(IntegerType()))
      .withColumn(self.__result, f.lit(None).cast(DoubleType()))
  )

  input_cols = [*self.__sortby, *self.__groupby, self.__target]
  period = self.__period - 1
  alpha = self.__a

В начальном сегменте кода функции происходит обогащение исходного набора данных путём инициализации новых колонок:

  • i – для индексации расчёта временного ряда;

  • переданного значения result, в нашем случае значение данной переменой было присвоено как predict.

Также выполняется инициализация дополнительных переменных, то есть создаётся список колонок input_cols и устанавливается их последовательность. Таким образом мы оставляем только необходимые поля для дальнейшей работы, а их последовательность важна для дальнейшей сортировки данных.

Далее устанавливаем значение period на единицу меньше, так как наша индексация начинается с нуля, а присвоение значения поля 'a' переменной alpha несёт в себе исключительно эстетический характер.

В нашем случае функции merge имеет имя model и содержит в себе математическое представление метода-EMA.

def model(accum: Column, cur_item: Column):
    prev_item = f.element_at(accum, -1)
    cur_i = prev_item['i'] + 1
    cur_result = (
        f.when(cur_i < period, prev_item[self.__result] + cur_item[self.__target])
        .when(cur_i == period, (prev_item[self.__result] + cur_item[self.__target]) / (self.__period))
        .otherwise(prev_item[self.__result] + alpha * (cur_item[self.__target] - prev_item[self.__result]))
    )
    main_cols = [cur_item[col].alias(col) for col in input_cols]
    new_record = f.array(f.struct(*main_cols, cur_i.alias('i'), cur_result.alias(self.__result)))
    return f.array_union(accum, new_record)

Сначала в переменную prev_item устанавливается последний элемент массива из переменной accum. Далее для переменной cur_i мы получаем доступ к предыдущему значению поля i через нотацию квадратные скобки и увеличиваем её на единицу.

Сам результат расчёта присваивается переменной cur_result по следующей логике:

  • если значение текущего поля i меньше значения периода (оно равно исходному значению – 1), то суммируем предыдущее значение из поля result (в нашем случае это будет поле predict) и из поля target (оно же close из исходного набора данных);

  • если мы достигаем значения i равного period, то есть первого периода расчёта, то используя накопленную сумму выполняем деление на количество периодов, тем самым получая среднее значение, в нашем случае за 5-ть периодов;

  • дальнейший расчёт происходит по выше представленной формуле EMA.

Для сохранения всех новых результатов в форме исходных строк мы выполняем объединение новых рассчитанных полей с предыдущими из переменной accum с помощью функции array_union. Таким образом выполняется не схлопывание данных к некоторому одному значению, а формируется композиция новых строк в массивы, которые в дальнейшем будут развёрнуты в новые строки.

Заключительный этап работы функции fit_predict представляет собой конфигурацию вызова функции aggregate и выполнения необходимых трансформаций промежуточного результата вычисления в конечный набор данных.

initial_struct_cols = [f.col('partition_array').getItem(0)[col].alias(col) for col in [*input_cols, 'i']]
return (
      input_df
      .select(*input_cols, 'i', self.__result)
      .groupBy(*self.__groupby)
      .agg(
        f.sort_array(
              f.collect_list(f.struct(*input_cols, 'i', self.__result))
          ).alias('partition_array')
      )
      .withColumn('new_partitioned_array', f.aggregate(
          f.slice('partition_array', 2, f.size('partition_array')),
          f.array(f.struct(*initial_struct_cols, f.col('partition_array').getItem(0)[self.__target].alias(self.__result))),
          model)
      ).select(f.inline('new_partitioned_array'))
  ).withColumn(self.__result, f.when(f.col('i') < period, None).otherwise(f.col(self.__result))).drop('i')

В переменной initial_struct_cols сохраняем массив входных полей для первого аргумента функции aggregate.

Перед определением колонки new_partitioned_array, которая будет содержать массив рассчитанных значений выполняем подготовку набора данных для функции aggregate. С помощью комбинации функций collect_list и sort_array мы получаем отсортированный массив строк по выбранным ключам.

Выполнив схлопывание данных, сконфигурируем параметры для функции aggregate следующим образом:

  • Первому параметру мы передадим срез массива начиная со второго элемента, то есть просто пропускаем первый элемент массива.

  • Для второго параметра в качестве входного значения используем первый элемент массива, причём он представлен также массивом из одного элемента.

  • Далее для третьего параметра в качестве аргумента передадим функцию model.

Если подробнее рассмотреть входное значение второго параметра функции aggregate, то можно заметить, что в качестве первого значения для колонки с именем из переменной self.__result устанавливается значение из колонки target. Это сделано, чтобы сумма первых двух элементов последовательности в рамках реализованной логики прошла корректно.

В конце выполняется развёртывание массива в набор строк с помощью функции inline и дополнительное преобразование полученного набора данных к итоговому виду. Новый набор данных будет в себе содержать ключи сортировки, ключи группировки, а также целевое и рассчитанное значение.

Итоговый вызов модели осуществляется следующим образом:

ema_df = EMA(period=5, target='close', result='predict', 
             groupby=['stock_comp_name'], sortby=['Date'])
predicted_df = ema.fit_predict(stocks_df)

Если используется фреймворк Spark ниже версии 3.4.0, то некоторые части алгоритма, следует изменить как представлено на примере ниже:

Пример вызова функции aggregate для PySpark версии ниже 3.4.0
return (
    input_df
    .select(*input_cols, 'i', self.__result)
    .groupBy(*self.__groupby)
    .agg(
        f.sort_array(
            f.collect_list(f.struct(*input_cols, 'i', self.__result))
        ).alias('partition_array')
    )
    .withColumn('new_partitioned_array', f.aggregate(
        f.expr('slice(partition_array, 2, size(partition_array))'),
        f.array(f.struct(*initial_struct_cols, f.col('partition_array').getItem(0)[self.__target].alias(self.__result))),
        model
        )
    ).selectExpr('inline(new_partitioned_array)')
).withColumn(self.__result, f.when(f.col('i') < period, None).otherwise(f.col(self.__result))).drop('i')

В данном примере прямые вызовы функций были заменены на выражения в связи с тем, что возможность вызывать напрямую в PySpark такие функции как slice, size и inline была добавлено в версии начиная с 3.4.0.

Итог

В рассмотренном мной примере показано, что функция aggregate достаточно удобна для реализации рекуррентных алгоритмов, а её преимуществами является то, что она:

  • поддерживается для языков Python и Scala;

  • позволяет реализовать расчёт на кластере, минуя издержки при обработке данных на памяти драйвера;

  • в сравнении с аналогичным по функциональности методом UDAF (User-Defined Aggregate Functions) поддерживается для Spark Connect в версии фреймворка 3.4.0.

Пример, приведённый в настоящей статье, является синтетическим, хоть и сам метод EMA является достаточно популярным механизмом анализа биржевых котировок и анализа цен других типов активов, таких как криптовалюта и других деривативов.

Функцию aggregate я использовал неоднократно в разных задачах, связанных с обработкой временных рядов, таких как корректировка последовательности выполнения операций конвейерных линий или прогнозирование данных спроса модифицированным методом Хольта-Уинтерса для промышленных предприятий.

В реальных условиях у представленного метода мной не наблюдались какие-либо проблемы с масштабированием расчёта при условии наличия соответствующих ключей партиционирования и отсутствия явных перекосов в данных.

Исходные данные - https://www.kaggle.com/datasets/borismarjanovic/price-volume-data-for-all-us-stocks-etfs

Исходные коды - https://github.com/D3vA1ex/PySpark-EMA

Комментарии (2)


  1. Sagy
    10.10.2025 08:20

    Оо, автор почти дошел до iir, может и Калмана откроет для себя. В чем новизна то? Хоть бы АФЧХ построил фильтра, годографа там всякие, устойчивость, собственные значения регрессионной модели. А ещё поднятия есть разные, улам наконец


    1. D3vAlex Автор
      10.10.2025 08:20

      Немного не понимаю вашу претензию к новизне. Статья демонстрирует способ работы с рекурентными соотношенями на фреймворке, а представленный метод является лишь только примером.