Всем привет!
Обработка и анализ временных последовательностей (временных рядов) достаточно часто встречающаяся задача. Обычно она решается с помощью идентичных подходов и методов. Однако когда анализ временного ряда предполагает выражение каждого последующего элемента через предыдущие, возникают проблемы с эффективностью реализации такого анализа. Это особенно актуально в контексте больших данных.
Данная статья демонстрирует подход к анализу и вычислению рекуррентных соотношений. В качестве примера я представлю реализацию на базе Apache Spark и Python метода экспоненциальной скользящей средней с использованием DataFrame API. Мы также рассмотрим метод агрегации данных, совместимый со Spark Connect, который был добавлен в версию 3.1 (для Scala - начиная с версии фреймворка 3.0), а именно – функцию aggregate.
Функция aggregate
Функция pyspark.sql.functions.aggregate (далее просто aggregate) создавалась для обработки массивных типов данных в наборах DataFrame. У неё есть несколько входных параметров:
col – имя или объект колонки, для которой функция была вызвана;
initalValue – стартовое значение для расчёта;
merge – функция слияния данных, принимает на вход два параметра типа Column, где первый параметр – это аккумулированное значение, а второй параметр – это текущее значение из массива;
finish – необязательная функция, выполняемая для конечного аккумулированного значения.
Согласно документации для фреймворка Spark версии 3.5, функция aggregate не подразумевает встраивания каких-либо вызовов функций-udf на Python в функцию merge, но разрешает это на Scala. А также, начиная с версии 3.5, для неё существует функция псевдоним с именем pyspark.sql.functions.reduce.
В качестве примера в документации приводится сценарий вычисления среднего значения элементов массива чисел [20.0, 4.0, 2.0, 6.0, 10.0], результатом расчёта которого будет 8.4:
df = spark.createDataFrame([(1, [20.0, 4.0, 2.0, 6.0, 10.0])], ("id", "values"))
def merge(acc, x):
count = acc.count + 1
sum = acc.sum + x
return struct(count.alias("count"), sum.alias("sum"))
df.select(
aggregate(
"values",
struct(lit(0).alias("count"), lit(0.0).alias("sum")),
merge,
lambda acc: acc.sum / acc.count,
).alias("mean")
).show()
Рассмотрим пример подробнее. В качестве значения col было использовано имя колонки, содержащей массив чисел, а в качестве стартового значения была передана композиция двух значений – count и sum, равные нулю.
Функция merge передаётся по имени и вызывается лишь один раз в момент формирования вычислительного плана. Сама функция инкапсулирует логику, встраиваемый сегмент в вычислительный план. Далее данная логика будет использована Spark’ом для вычисления последующих значений для каждого элемента массива.
Иначе говоря, первое значение данного объекта было задано как struct(lit(0).alias("count"), lit(0.0).alias("sum"))
, в дальнейшем оно было передано в качестве аргумента параметру с именем acc функции merge. Переменная x функции merge представляет собой отображение элемента из массива и имеет также тип Column, а план расчёта нового значения представлен композицией объектов вида struct(count.alias("count"), sum.alias("sum"))
.
Последний аргумент функции aggregate представлен в виде лямбда функции и производит действие с объектом Column, в данном случае после прохода всех элементов в массиве выполняет деление суммы на число элементов.
Пример вычислительного плана для Spark версии 3.5.4:
== Physical Plan ==
Project [aggregate(values#194, [0,0.0], lambdafunction(struct(count, (lambda x_0#202.count + 1), sum, (lambda x_0#202.sum + lambda y_1#203)), lambda x_0#202, lambda y_1#203, false), lambdafunction((lambda x_2#204.sum / cast(lambda x_2#204.count as double)), lambda x_2#204, false)) AS mean#201]
+- *(1) Scan ExistingRDD[id#193L,values#194]
Принцип работы функции aggregate крайне схож с принципом работы функции свёртки, и ближайшим её аналогом будет функция foldLeft из языка Scala или функция reduce из языка Python.
Если опустить финальный этап расчёта среднего значения и объединить каждый вычисленный элемент массива с предыдущими в новый массив, то таким образом можно сформировать новую последовательность данных. Данный подход может быть полезен для расчёта рекуррентных соотношений.
Применим данную гипотезу к примеру расчёта методом экспоненциальной средней.
Метод экспоненциальной скользящей средней (EMA)
Метод скользящего среднего достаточно широко используется в экономике и в других предметных областях для сглаживания временных рядов с целью исключения случайных событий. В частности, можно найти утверждение, что данный метод в вопросах экономики помогает проводить оценки ВВП, показателей занятости и других макроэкономических индикаторов (например, рынок ценных бумаг).
Существует несколько разновидностей данного метода. Примеры наиболее распространённых:
простое скользящее окно - (SMA - simple moving average);
взвешенное скользящее окно - (WMA - weighted moving average);
экспоненциальная скользящая средняя - (EMA - exponential moving average).
Есть также и иные более специфичные разновидности, но в данной статье будет показано применение такого метода как EMA. В первую очередь из-за его популярности при анализе стоимости ценных бумаг.
Перед тем как перейдем непосредственно к реализации метода EMA на PySpark, детальнее рассмотрим сам метод и его алгебраическое представление. Стоит отметить, что знать, как реализуются и работают другие разновидности метода скользящей средней для понимания работы EMA не нужно.
Метод EMA представляется в виде линейного алгебраического выражения:
Где:
— это предыдущее вычисленное значение;
— текущие данные(значение);
— сглаживающий коэффициент от 0 до 1 вычисляемый по формуле
;
— количество значений за период.
Первое значение EMA вычисляется как среднее значение за некоторый период, определяемый переменной :
Определив алгебраическую составляющую, сразу можем определить для себя одну важную проблему: каждое следующее значение зависит от предыдущего рассчитанного. Это значит, что перед нами рекуррентное соотношение.
Завершив знакомство с методом EMA, рассмотрим способ его реализации на PySpark.
Реализация метода EMA
Данные для текущего примера были взяты с портала Kaggle. В наборе данных представлены колебания стоимости ценных бумаг для американских корпораций, таких как Apple, Adobe, AMD, Google, Intel, Microsoft и Nvidia. Набор данных представляет собой следующие значения:
Date — дата торгов;
Open — цена за акцию в начале торгов;
High — наивысшая цена за акцию;
Low — самая низкая цена за акцию;
Close — цена на момент закрытия торгов (итоговый показатель);
Volume — показатель среднего объёма (оборот за одну торговую сессию);
OpenInt — открытый интерес (в данном наборе данных он нулевой и не участвует в анализе).
Из всех колонок, представленных в текущих наборах данных, нас интересуют только колонка Date (дата торгов) и Close (итоговый показатель цены за акцию).
Каждый файл в наборе данных содержит в себе информацию по одной корпорации и носит в себе сокращённое наименование, по которому уникальным образом идентифицируется компания на рынке акций. Эту информацию мы будет использовать для обогащения данных уникальным ключом. Так мы будем определять, для какой из компаний мы строим анализ методом EMA.
Временные промежутки, в рамках которых представлены данные по компаниям, имеют разное начало, но все они оканчиваются 2017 годом. Ниже представлен код подготовки данных для дальнейшего анализа.
stocks_schema = StructType([
StructField('Date', TimestampType()),
StructField('Open', FloatType()),
StructField('High', FloatType()),
StructField('Low', FloatType()),
StructField('Close', FloatType()),
StructField('Volume', IntegerType()),
StructField('OpenInt', FloatType()),
])
stocks_df = (
spark.read
.csv('./data/stocks', sep=',', inferSchema=True,
header=True, dateFormat='yyyy-MM-dd')
.where(f.col('close').isNotNull() & (f.col('close') > 0))
.where(f.col('date') >= f.to_timestamp(f.lit('2017-01-01')))
.withColumn('stock_comp_name',
f.element_at(f.split(f.input_file_name(), '/'), -1))
.dropna()
)
company_names = ['a.us.txt', 'adbe.us.txt', 'amd.us.txt',
'goog.us.txt', 'intc.us.txt', 'msft.us.txt', 'nvda.us.txt']
comp_df_list = [stocks_df.where(f.col('stock_comp_name') == comp_name) for comp_name in company_names]
stocks_df = reduce(lambda x, y: x.unionByName(y), comp_df_list)
Завершив предварительную подготовку данных, перейдём к реализации метода EMA с помощью функции aggregate. Возьмём за пример данные для компании Adobe с 2017 года и самостоятельно рассчитаем в качестве примера первое значение для 5-го периода (в данной статье не рассматриваются рекомендации по выбору первого периода) и для двух последующих:
Date |
Open |
High |
Low |
Close |
Volume |
OpenInt |
|
1 |
2017-01-03 |
103.43 |
104.03 |
102.81 |
103.48 |
2244021 |
0 |
2 |
2017-01-04 |
103.74 |
104.37 |
103.495 |
104.14 |
1653606 |
0 |
3 |
2017-01-05 |
104.13 |
106.02 |
103.77 |
105.91 |
2423943 |
0 |
4 |
2017-01-06 |
105.98 |
108.43 |
105.25 |
108.3 |
2586821 |
0 |
5 |
2017-01-09 |
107.96 |
108.79 |
107.56 |
108.57 |
2702977 |
0 |
6 |
2017-01-10 |
108.57 |
108.79 |
107.61 |
108.26 |
2957500 |
0 |
7 |
2017-01-11 |
108.23 |
109.05 |
108 |
108.99 |
1637329 |
0 |
8 |
2017-01-12 |
107.99 |
108.74 |
107.16 |
108.59 |
1591516 |
0 |
9 |
2017-01-13 |
108.84 |
108.9 |
107.98 |
108.53 |
1531230 |
0 |
10 |
2017-01-17 |
107.79 |
108.05 |
107.06 |
108 |
1667538 |
0 |
11 |
2017-01-18 |
108.5 |
109.17 |
107.53 |
108.79 |
2480734 |
0 |
12 |
2017-01-19 |
108.63 |
109.98 |
108.04 |
109.79 |
3007274 |
0 |
13 |
2017-01-20 |
110.02 |
110.81 |
109.57 |
110.71 |
3079168 |
0 |
14 |
2017-01-23 |
110.71 |
111.92 |
110.3 |
110.98 |
2404494 |
0 |
15 |
2017-01-24 |
111.46 |
114.17 |
111.34 |
113.72 |
3545803 |
0 |
16 |
2017-01-25 |
113.77 |
114.57 |
113.29 |
114.26 |
3729748 |
0 |
17 |
2017-01-26 |
113.79 |
114.44 |
112.73 |
112.88 |
2703751 |
0 |
18 |
2017-01-27 |
113.07 |
114.01 |
112.75 |
113.99 |
1663256 |
0 |
19 |
2017-01-30 |
113.22 |
113.87 |
112.26 |
113.82 |
2104222 |
0 |
20 |
2017-01-31 |
113.22 |
113.75 |
112.695 |
113.38 |
1568978 |
0 |
Сначала необходимо вычислить среднее значение для первого периода расчёта (в нашем случае это пятый период), а затем, используя предыдущее вычисленное значение, получить последующие:
-
Для первых пяти периодов мы получим среднее значение равное 106.08 которое мы рассчитали по формуле:
;
Рассчитаем коэффициент alpha, необходимый для последующих расчётов, а именно
где
– это количество периодов равное значению 5;
Используя ранее полученное значение для пятого периода и рассчитанный коэффициент alpha, мы получим значение для шестого периода:
;
Дальнейший расчёт для седьмого периода будет содержать в себе предыдущее значение шестого периода, а именно:
и т. д.
Ниже приведён полный расчёт первых 20 значений для компании Adobe, а также график сравнения фактических (close) и предсказанных (predict) значений для демонстрации сглаживающего характера метода EMA.
Date |
stock_comp_name |
close |
predict |
|
1 |
2017-01-03 |
adbe.us.txt |
103.48 |
NULL |
2 |
2017-01-04 |
adbe.us.txt |
104.14 |
NULL |
3 |
2017-01-05 |
adbe.us.txt |
105.91 |
NULL |
4 |
2017-01-06 |
adbe.us.txt |
108.3 |
NULL |
5 |
2017-01-09 |
adbe.us.txt |
108.57 |
106.08 |
6 |
2017-01-10 |
adbe.us.txt |
108.26 |
106.80666666666667 |
7 |
2017-01-11 |
adbe.us.txt |
108.99 |
107.53444444444445 |
8 |
2017-01-12 |
adbe.us.txt |
108.59 |
107.8862962962963 |
9 |
2017-01-13 |
adbe.us.txt |
108.53 |
108.10086419753087 |
10 |
2017-01-17 |
adbe.us.txt |
108.0 |
108.06724279835392 |
11 |
2017-01-18 |
adbe.us.txt |
108.79 |
108.30816186556928 |
12 |
2017-01-19 |
adbe.us.txt |
109.79 |
108.80210791037952 |
13 |
2017-01-20 |
adbe.us.txt |
110.71 |
109.43807194025301 |
14 |
2017-01-23 |
adbe.us.txt |
110.98 |
109.95204796016867 |
15 |
2017-01-24 |
adbe.us.txt |
113.72 |
111.20803197344578 |
16 |
2017-01-25 |
adbe.us.txt |
114.26 |
112.22535464896386 |
17 |
2017-01-26 |
adbe.us.txt |
112.88 |
112.44356976597591 |
18 |
2017-01-27 |
adbe.us.txt |
113.99 |
112.95904651065061 |
19 |
2017-01-30 |
adbe.us.txt |
113.82 |
113.2460310071004 |
20 |
2017-01-31 |
adbe.us.txt |
113.38 |
113.29068733806693 |

Теперь рассмотрим реализацию этого метода на pySpark с использованием функции aggregate. Сначала создадим класс, который будет описывать сам метод:
class EMA:
def __init__(self, period: int, target: str, result: str, groupby: List[str], sortby: List[str]):
self.__period: int = period
self.__a: float = 2 / (self.__period + 1)
self.__target: str = target
self.__result: str = result
self.__groupby: List[str] = groupby
self.__sortby: List[str] = sortby
Для класса EMA были определены следующие его параметры (поля):
period – период или n, для которого будет вычислено первое значение;
target – колонка, содержащая целевое значение для анализа;
result – имя итоговой результирующей колонки с рассчитанными значениями (не подразумевается, что она существует в исходном наборе данных);
groupby – список колонок, по которым будет выполнена агрегация. В данном случае это одна колонка stock_comp_name, содержащая в себе название организации;
sortby – колонки, по которым будет выполнена сортировка исходных значений по выбранным ключам;
alpha (или просто
) будет вычислено по выше определённому правилу.
Логику расчёта экспоненциальной средней инкапсулируем в функции fit_predict, как представлено на примере ниже:
def fit_predict(self, input_df: DataFrame) -> DataFrame:
input_df = (
input_df
.withColumn('i', f.lit(0).cast(IntegerType()))
.withColumn(self.__result, f.lit(None).cast(DoubleType()))
)
input_cols = [*self.__sortby, *self.__groupby, self.__target]
period = self.__period - 1
alpha = self.__a
def model(accum: Column, cur_item: Column):
prev_item = f.element_at(accum, -1)
cur_i = prev_item['i'] + 1
cur_result = (
f.when(cur_i < period, prev_item[self.__result] + cur_item[self.__target])
.when(cur_i == period, (prev_item[self.__result] + cur_item[self.__target]) / (self.__period))
.otherwise(prev_item[self.__result] + alpha * (cur_item[self.__target] - prev_item[self.__result]))
)
main_cols = [cur_item[col].alias(col) for col in input_cols]
new_record = f.array(f.struct(*main_cols, cur_i.alias('i'), cur_result.alias(self.__result)))
return f.array_union(accum, new_record)
initial_struct_cols = [f.col('partition_array').getItem(0)[col].alias(col) for col in [*input_cols, 'i']]
return (
input_df
.select(*input_cols, 'i', self.__result)
.groupBy(*self.__groupby)
.agg(
f.sort_array(
f.collect_list(f.struct(*input_cols, 'i', self.__result))
).alias('partition_array')
)
.withColumn('new_partitioned_array', f.aggregate(
f.slice('partition_array', 2, f.size('partition_array')),
f.array(f.struct(*initial_struct_cols, f.col('partition_array').getItem(0)[self.__target].alias(self.__result))),
model
)
).select(f.inline('new_partitioned_array'))
).withColumn(self.__result, f.when(f.col('i') < period, None).otherwise(f.col(self.__result))).drop('i')
Подробнее рассмотрим данную функцию.
input_df = (
input_df
.withColumn('i', f.lit(0).cast(IntegerType()))
.withColumn(self.__result, f.lit(None).cast(DoubleType()))
)
input_cols = [*self.__sortby, *self.__groupby, self.__target]
period = self.__period - 1
alpha = self.__a
В начальном сегменте кода функции происходит обогащение исходного набора данных путём инициализации новых колонок:
i – для индексации расчёта временного ряда;
переданного значения result, в нашем случае значение данной переменой было присвоено как predict.
Также выполняется инициализация дополнительных переменных, то есть создаётся список колонок input_cols и устанавливается их последовательность. Таким образом мы оставляем только необходимые поля для дальнейшей работы, а их последовательность важна для дальнейшей сортировки данных.
Далее устанавливаем значение period на единицу меньше, так как наша индексация начинается с нуля, а присвоение значения поля 'a' переменной alpha несёт в себе исключительно эстетический характер.
В нашем случае функции merge имеет имя model и содержит в себе математическое представление метода-EMA.
def model(accum: Column, cur_item: Column):
prev_item = f.element_at(accum, -1)
cur_i = prev_item['i'] + 1
cur_result = (
f.when(cur_i < period, prev_item[self.__result] + cur_item[self.__target])
.when(cur_i == period, (prev_item[self.__result] + cur_item[self.__target]) / (self.__period))
.otherwise(prev_item[self.__result] + alpha * (cur_item[self.__target] - prev_item[self.__result]))
)
main_cols = [cur_item[col].alias(col) for col in input_cols]
new_record = f.array(f.struct(*main_cols, cur_i.alias('i'), cur_result.alias(self.__result)))
return f.array_union(accum, new_record)
Сначала в переменную prev_item устанавливается последний элемент массива из переменной accum. Далее для переменной cur_i мы получаем доступ к предыдущему значению поля i через нотацию квадратные скобки и увеличиваем её на единицу.
Сам результат расчёта присваивается переменной cur_result по следующей логике:
если значение текущего поля i меньше значения периода (оно равно исходному значению – 1), то суммируем предыдущее значение из поля result (в нашем случае это будет поле predict) и из поля target (оно же close из исходного набора данных);
если мы достигаем значения i равного period, то есть первого периода расчёта, то используя накопленную сумму выполняем деление на количество периодов, тем самым получая среднее значение, в нашем случае за 5-ть периодов;
дальнейший расчёт происходит по выше представленной формуле EMA.
Для сохранения всех новых результатов в форме исходных строк мы выполняем объединение новых рассчитанных полей с предыдущими из переменной accum с помощью функции array_union. Таким образом выполняется не схлопывание данных к некоторому одному значению, а формируется композиция новых строк в массивы, которые в дальнейшем будут развёрнуты в новые строки.
Заключительный этап работы функции fit_predict представляет собой конфигурацию вызова функции aggregate и выполнения необходимых трансформаций промежуточного результата вычисления в конечный набор данных.
initial_struct_cols = [f.col('partition_array').getItem(0)[col].alias(col) for col in [*input_cols, 'i']]
return (
input_df
.select(*input_cols, 'i', self.__result)
.groupBy(*self.__groupby)
.agg(
f.sort_array(
f.collect_list(f.struct(*input_cols, 'i', self.__result))
).alias('partition_array')
)
.withColumn('new_partitioned_array', f.aggregate(
f.slice('partition_array', 2, f.size('partition_array')),
f.array(f.struct(*initial_struct_cols, f.col('partition_array').getItem(0)[self.__target].alias(self.__result))),
model)
).select(f.inline('new_partitioned_array'))
).withColumn(self.__result, f.when(f.col('i') < period, None).otherwise(f.col(self.__result))).drop('i')
В переменной initial_struct_cols сохраняем массив входных полей для первого аргумента функции aggregate.
Перед определением колонки new_partitioned_array, которая будет содержать массив рассчитанных значений выполняем подготовку набора данных для функции aggregate. С помощью комбинации функций collect_list и sort_array мы получаем отсортированный массив строк по выбранным ключам.
Выполнив схлопывание данных, сконфигурируем параметры для функции aggregate следующим образом:
Первому параметру мы передадим срез массива начиная со второго элемента, то есть просто пропускаем первый элемент массива.
Для второго параметра в качестве входного значения используем первый элемент массива, причём он представлен также массивом из одного элемента.
Далее для третьего параметра в качестве аргумента передадим функцию model.
Если подробнее рассмотреть входное значение второго параметра функции aggregate, то можно заметить, что в качестве первого значения для колонки с именем из переменной self.__result устанавливается значение из колонки target. Это сделано, чтобы сумма первых двух элементов последовательности в рамках реализованной логики прошла корректно.
В конце выполняется развёртывание массива в набор строк с помощью функции inline и дополнительное преобразование полученного набора данных к итоговому виду. Новый набор данных будет в себе содержать ключи сортировки, ключи группировки, а также целевое и рассчитанное значение.
Итоговый вызов модели осуществляется следующим образом:
ema_df = EMA(period=5, target='close', result='predict',
groupby=['stock_comp_name'], sortby=['Date'])
predicted_df = ema.fit_predict(stocks_df)
Если используется фреймворк Spark ниже версии 3.4.0, то некоторые части алгоритма, следует изменить как представлено на примере ниже:
Пример вызова функции aggregate для PySpark версии ниже 3.4.0
return (
input_df
.select(*input_cols, 'i', self.__result)
.groupBy(*self.__groupby)
.agg(
f.sort_array(
f.collect_list(f.struct(*input_cols, 'i', self.__result))
).alias('partition_array')
)
.withColumn('new_partitioned_array', f.aggregate(
f.expr('slice(partition_array, 2, size(partition_array))'),
f.array(f.struct(*initial_struct_cols, f.col('partition_array').getItem(0)[self.__target].alias(self.__result))),
model
)
).selectExpr('inline(new_partitioned_array)')
).withColumn(self.__result, f.when(f.col('i') < period, None).otherwise(f.col(self.__result))).drop('i')
В данном примере прямые вызовы функций были заменены на выражения в связи с тем, что возможность вызывать напрямую в PySpark такие функции как slice, size и inline была добавлено в версии начиная с 3.4.0.
Итог
В рассмотренном мной примере показано, что функция aggregate достаточно удобна для реализации рекуррентных алгоритмов, а её преимуществами является то, что она:
поддерживается для языков Python и Scala;
позволяет реализовать расчёт на кластере, минуя издержки при обработке данных на памяти драйвера;
в сравнении с аналогичным по функциональности методом UDAF (User-Defined Aggregate Functions) поддерживается для Spark Connect в версии фреймворка 3.4.0.
Пример, приведённый в настоящей статье, является синтетическим, хоть и сам метод EMA является достаточно популярным механизмом анализа биржевых котировок и анализа цен других типов активов, таких как криптовалюта и других деривативов.
Функцию aggregate я использовал неоднократно в разных задачах, связанных с обработкой временных рядов, таких как корректировка последовательности выполнения операций конвейерных линий или прогнозирование данных спроса модифицированным методом Хольта-Уинтерса для промышленных предприятий.
В реальных условиях у представленного метода мной не наблюдались какие-либо проблемы с масштабированием расчёта при условии наличия соответствующих ключей партиционирования и отсутствия явных перекосов в данных.
Исходные данные - https://www.kaggle.com/datasets/borismarjanovic/price-volume-data-for-all-us-stocks-etfs
Исходные коды - https://github.com/D3vA1ex/PySpark-EMA
Sagy
Оо, автор почти дошел до iir, может и Калмана откроет для себя. В чем новизна то? Хоть бы АФЧХ построил фильтра, годографа там всякие, устойчивость, собственные значения регрессионной модели. А ещё поднятия есть разные, улам наконец
D3vAlex Автор
Немного не понимаю вашу претензию к новизне. Статья демонстрирует способ работы с рекурентными соотношенями на фреймворке, а представленный метод является лишь только примером.