В финтехе нам часто приходится обрабатывать довольно массивные объемы данных курсов обмена валют. Мы получаем данные из разных источников, и каждый из них имеет собственное представление о том, как экстраполировать значения курсов на завтра, послезавтра, следующий месяц и даже следующие три года. Если бы кто-то умел предсказывать курсы правильно, впору было бы закрывать бизнес и просто тупо менять деньги туда-сюда. Некоторые источники пользуются бо?льшим доверием, некоторые поставляют сплошь мусор, с редкими вкраплениями почти правильных значений, но зато для экзотических пар. Наша работа заключается в том, чтобы просеять эти десятки тысяч значений в секунду и определить, что именно показать заказчикам. Нам нужно отфильтровать единственное правильное значение из тонны грязи и ила, как это делают фламинго на обеде.
Особым отличительным признаком фламинго является массивный выгнутый вниз клюв, с помощью которого они фильтруют пищу из воды или ила.
— Вики
Так родилась библиотека Vela
, которая хранит кеш состояния для нескольких значений в заданных временных интервалах. Под капотом она на лету отсеивает плохие и устаревшие данные, а также предоставляет доступ к последним N прошедшим валидацию значениям для каждого ключа (пары валют, в нашем случае).
Допустим, мы собираем курсы для трех пар валют. Простейшее определение Vela
для хранения актуального состояния будет выглядеть как-то так:
defmodule Pairs do
use Vela,
eurusd: [sorter: &Kernel.<=/2],
eurgbp: [limit: 3, errors: 1],
eurcad: [validator: Pairs]
@behaviour Vela.Validator
@impl Vela.Validator
def valid?(:eurcad, rate), do: rate > 0
end
Обновление значений
Vela.put/3
функция последовательно сделает следующее:
- вызовет
validator
на значении, если таковой определен (см. главку Валидация ниже); - добавит значение либо в ряд хороших значений, если валидация закончилась успешно, или в служебный ряд
:__errors__
в обратном случае; - вызовет сортировку если
sorter
определен для данного ключа, или просто положит значение в голову списка (LIFO, см. главку Сортировка ниже); - обрежет ряд в соответствии с параметром
:limit
переданном при создании; - вернет обновленную структуру
Vela
.
iex|1 > pairs = %Pairs{}
iex|2 > Vela.put(pairs, :eurcad, 1.0)
#? %Pairs{..., eurcad: [1.0], ...}
iex|3 > Vela.put(pairs, :eurcad, -1.0)
#? %Pairs{__errors__: [eurcad: -1.0], ...}
iex|4 > pairs |> Vela.put(:eurusd, 2.0) |> Vela.put(:eurusd, 1.0)
#? %Pairs{... eurusd: [1.0, 2.0]}
Также Vela
имплементирует Access
, так что можно для обновления значений воспользоваться любой из стандартных функций для глубокого обновления структур из арсенала Kernel
: Kernel.get_in/2
, Kernel.put_in/3
, Kernel.update_in/3
, Kernel.pop_in/2
, and Kernel.get_and_update_in/3
.
Валидация
Валидатор может быть определен как:
- внешняя функция с одним аргументом (
&MyMod.my_fun/1
), она получит только значение для валидации; - внешняя функция с двумя аргументами,
&MyMod.my_fun/2
, она получит паруserie, value
для валидации; - модуль, имплементирующий
Vela.Validator
; - конфигурационный параметр
threshold
, и — опционально —compare_by
, см. главку Comparison ниже.
Если валидация прошла успешно, значение добавляется в список под соответствующим ключом, в обратном случае кортеж {serie, value}
отправляется в :__errors_
.
Сравнение
Значения, сохраняемые в этих рядах, могут быть любыми. Чтобы научить Vela
их сравнивать, необходимо передать compare_by
параметр в определение ряда (если только значения не могут быть сравнены стандартным Kernel.</2
); этот параметр должен иметь тип (Vela.value() -> number())
. По умолчанию это просто & &1
.
Также, в определение ряда можно передать параметр comparator
для вычисления значений дельт (min
/max
); например, передавая Date.diff/2
в качестве компаратора, можно получить правильные дельты для дат.
Другим удобным способом работы является передача параметра threshold
, который определяет максимально допустимое отношение нового значения к {min, max}
интервалу. Поскольку он задан в процентах, проверка не использует comparator
, но все еще использует compare_by
. Например, чтобы указать пороговое значение для времени дат, необходимо указать compare_by: &DateTime.to_unix/1
(для получения целочисленного значения) и threshold: 1
, в результате чего новые значения будут разрешены, только если они находятся в ±band
интервале от текущих значений.
Наконец, можно использовать Vela.equal?/2
для сравнения двух кешей. Если значения определяют функцию equal?/2
или compare/2
, то эти функции будут использованы для сравнения, в противном случае мы тупо используем ==/2
.
Получение значений
Обработка текущего состояния обычно начинается с вызова Vela.purge/1
, который убирает устаревшие значения (если validator
завязан на timestamps
). Затем можно вызвать Vela.slice/1
, которая вернет keyword
с именами рядов в качестве ключей и первым, актуальными значениями.
Также можно воспользоваться get_in/2
/pop_in/2
для низкоуровнего доступа к значениям в каждом ряду.
Приложение
Vela
может оказаться чрезвычайно полезной в качестве кеша временных рядов в стейте процесса типа GenServer
/Agent
. Мы хотим никогда не использовать устаревшие значения курсов, и для этого мы просто держим процесс с состоянием, обрабатываемым Vela
, с валидатором, показанным ниже.
@impl Vela.Validator
def valid?(_key, %Rate{} = rate),
do: Rate.age(rate) < @death_age
и Vela.purge/1
спокойно удаляет все устаревшие значения каждый раз, когда нам требуются данные. Для доступа к актуальным значениям мы просто вызываем Vela.slice/1
, а когда требуется небольшая история по курсу (весь ряд целиком), мы просто возвращаем его — уже отсортированным — с провалидированными значениями.
Удачного кеширования временных рядов!
anonymous
Перечитал статью дважды и не понял как все это применить в жизни. Успешно решаем работу с временными рядами средствами ClickHouse (тиковые биржевые данные). В чем преимущество вашего решения и прямых выборок из кликхауза?
chapuza Автор
Ээээ… Ну принципиальных отличий два:
Vela
— это in-memory решение. Паттерн использования примерно такой: запускаем процесс на каждую пару валют (или на любой параметр временно?го ряда) и этот процесс держит актуальный кеш в стейте. Не нужны никакие 3rd-party базы. У нас примерно 20К пар, курсы по каждой приходят примерно 10 раз в секунду; 200 000 записей в ClickHouse в секунду — это так себе вариант, особенно учитывая, что хранить курсы нам особо не требуется. Подключим еще десять провайдеров — это число утроится. С решением выше я просто добавлю в OTP кластер ±2 машины.Есть и недостатки:
Cloister
.anonymous
Такой себе специализированный Tarantool. :)
Спасибо за ответ. По количеству записей в кликхауз сделаю ремарку — пакетные записи в него очень быстрые — думаю вы понимаете, что тиковые данные на фьючерсном рынке по 20 инструментам генерируют намного большие объёмы данных. Это не для полемики — просто для информации.
chapuza Автор
Это я понимаю. Но я не до конца понимаю, как я соберу этот пакет. А кроме того, у меня совсем не read-only, мне еще придется оттуда читать, потому что мне надо адекватно отреагировать на каждую пришедшую единицу данных, и это некоторая арифметика, не очень сложная, но все же не
if x > 0, do: spawn()
. Когда мы получаем новый курс, нам нужно сравнить его по нескольким параметрам со шлейфом последних, и или выбросить, или запихнуть в самый конец кеша, или в начало — и использовать для создания нескольких новых ивентов.У нас паттерны принципиально разные, похоже. Мы пришли к тому, что используя OTP — надо использовать OTP. Процесс знает только про свою пару валют, и оперирует только ей. Если процесс крашится — отваливается только эта пара (там есть защитная прослойка, но это совсем вне темы).
Как я соберу пакет из 20К разных процессов из разных нод кластера? Кроме того, быстрое-то оно быстрое, но нуждается в дополнительном обслуживании. Ну и, в конце концов, я вообще очень далек от мысли сравнивать memory-cache с полноценным KV. Просто есть задачи (и описанная мной — как раз из таких, и у нас есть еще похожие) — куда совсем не хочется тащить KV (вокруг которого, повторюсь, придется понастроить логики, которая будет проверять, сортировать, правильно извлекать, и т. д., сиречь все эти запросы будут мозолить глаза прямо в приложении, а если похожих приложений несколько — то и в каждом из них.