Здравствуйте дорогие хабровчане, в этом посте я покажу, как написать свой биржевой индекс наподобие S&P 500 или Nasdaq.
О том, как мне это пришло в голову можно прочитать в моей предыдущей статье: Как я решил стать трейдером и проигрался, а потом отыгрался, потому, что я программист. Мой опыт. Здесь будет рассмотрена только техническая сторона.
В силу того, что прошлая статья получила большой отклик у хабровчян, я решил сделать техническое описание кода приложения.
Не работаю в Тинькофф и мне не заплатили за рекламу. Конечно, была попытка добавить эту статью в корпоративный блог Тинькова хоть за символическую плату, но не тут то было. Меня просто месяц динамили, а потом сказали, дескать блог только исключительно для сотрудников компании (должно быть это месть за критику службы поддержки в прошлой статье).
В предыдущей статье было описано, как мне удалось рассчитать и использовать индекс чистой стоимости фонда и как я к этому пришёл. Этот индекс был написан на старом api, на тот момент единственном. Из-за этого возникло множество проблем, главная из которых большая задержка в обновлении данных.
Чтобы уменьшить эту задержку (которая в итоге составила 5,5 секунд!), мне пришлось использовать два токена доступа, которые чередовались для каждого запроса. Также я вынужден был выбрать строго ограниченное количество компаний, для которых проводился расчёт. Бутылочным горлышком являлось отсутствие возможности получать данные по нескольким компаниям в одном запросе. Например, чтобы получить текущие цены акций десяти компаний нужно было десять запросов. Если не укладывался в лимит, а количество запросов было ограничено в минуту, api возвращал ошибки на последующие обращения.
Однако, в начале 2022 года тинькофф выкатил новый api, в котором эти проблемы были решены. Теперь можно в одном запросе получать все текущие цены на акции.
Сам микросервис написан на Python 3 и упакован в Docker-контейнер, клиент — на JavaScript, график TradingView. Микросервис и клиент общаются при помощи api реализованном на FastApi.
Репозиторий с исходным кодом — на GitHub.
Шаг 1. Получаю данные из Тинькофф api
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
import sys
from tinkoff.invest import Client, CandleInterval
import warnings
warnings.filterwarnings("ignore", 'This pattern has match groups')
class CustomIndex:
def __init__(self, tickets, historical_days=2, token='token.txt'):
self.logger = self.__create_logger()
self.historical_days = historical_days
self.reset_last_candle()
try:
with open(token, 'r') as file:
self.__token = file.read().rstrip()
except Exception as e:
self.logger.exception(e)
raise Exception('--> Ошибка в файле token.txt: '+ str(e))
try:
with Client(self.__token) as client:
shares = client.instruments.shares()
except Exception as e:
self.logger.exception(e)
raise Exception('--> tinkoff api - Ошибка загрузки данных обо всех акциях.')
shares = pd.DataFrame(shares.instruments)
self.df = shares[shares['ticker'].isin(tickets)]
def reset_last_candle(self, open_price=0.0, time=pd.Timestamp(0, tz='Europe/Moscow')):
self.last_candle = pd.Series({'open': open_price,
'high': open_price,
'low': open_price,
'close': open_price,
'time': time})
def round_to_minutes(self, t, interval=5):
delta = pd.Timedelta(minutes=t.minute % interval,
seconds=t.second,
microseconds=t.microsecond)
t -= delta
return t
def __units_nano_convert(self, d):
price = '{}.{}'.format(d['units'], d['nano'])
price = float(price)
return price
def __create_logger(self):
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
formatter = logging.Formatter('--> %(asctime)s - %(name)s - %(levelname)s - %(message)s')
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(formatter)
logger.addHandler(sh)
# send logs in docker logs
fh = logging.FileHandler('/proc/1/fd/1')
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def get_tinkoff_candles(self, figi, interval):
if interval == 1:
interval = CandleInterval.CANDLE_INTERVAL_1_MIN
elif interval == 5:
interval = CandleInterval.CANDLE_INTERVAL_5_MIN
elif interval == 15:
interval = CandleInterval.CANDLE_INTERVAL_15_MIN
else:
interval = -1
curr_time = datetime.now()
data = []
for day in range(self.historical_days):
try:
with Client(self.__token) as client:
data += client.market_data.get_candles(
figi=figi,
from_=curr_time - timedelta(days=day+1),
to=curr_time - timedelta(days=day),
interval=interval
).candles
except Exception as e:
self.logger.exception(e)
raise Exception('--> tinkoff api - history - Ошибка загрузки исторических данных.')
candles = pd.DataFrame(data)
for col in ['open', 'high', 'low', 'close']:
candles[col] = candles[col].apply(self.__units_nano_convert)
candles = candles[['time', 'open', 'high', 'low', 'close']]
candles['time'] = candles['time'].dt.tz_convert('Europe/Moscow')
candles.set_index('time', inplace=True)
candles = candles.drop_duplicates()
return candles
def get_tinkoff_last_prices(self):
try:
with Client(self.__token) as client:
last_prices = client.market_data.get_last_prices(figi=self.df['figi'].to_list())
except Exception as e:
self.logger.exception(e)
raise Exception('--> tinkoff api - last price - Ошибка загрузки последней цены.')
last_prices = pd.DataFrame(last_prices.last_prices)
last_prices = last_prices['price'].apply(self.__units_nano_convert)
return last_prices
У каждой компании, торгующейся на бирже, кроме названия, есть свой тикет. Однако, для того, чтобы получить данные по компании необходимо знать её figi
код. Чтобы его получить необходимо сопоставить тикет и figi
код. Это происходит в конструкторе класса CustomIndex
. В нём скачиваются figi
для всех акций и сопоставляются с нужными мне тикетами. Здесь нет ничего сложного.
Метод get_tinkoff_candles
получает исторические данные по figi
коду. Исходя из ограничений api на интервалах 1, 5 и 15 минут можно получить только один день. Так же было и в старом api. То есть, чем больше исторических данных мне нужно, тем больше нужно запросов (ограничение запросов в минуту никуда не делось). Любопытно было бы узнать, из каких соображений данные ограничили одним днём.
Ну и главное отличие от старого api я с удовольствием использовал в методе get_tinkoff_last_prices
. В нём обновляются все последние цены для всех акций за один запрос.
Шаг 2. Микросервис (FastApi)
import pandas as pd
import numpy as np
from collections import defaultdict
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from custom_index import CustomIndex
# Топ 5 самых дорогих акций из S&P 500
tickets = ['NVR', 'AMZN', 'GOOG', 'GOOGL', 'BKNG']
# Количество исторических дней для отображения
historical_days = 4
# Расчёт индекса
def compute_index(prices):
mean_price = np.mean(prices)
return mean_price
api = FastAPI()
api.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
ci = CustomIndex(tickets, historical_days=historical_days, token='token.txt')
# Исторические данные
@api.get('/api/historical_candles/{interval}')
def historical_candles(interval: int):
global ci
# Средняя цена
d = defaultdict(pd.DataFrame)
# Собираем данные, все open от всех тикетов в один
# датафрейм, high в другой и т.д.
def concat_columns(d, one_ticket_candles):
for col in ['open', 'high', 'low', 'close']:
d[col] = pd.concat([d[col], one_ticket_candles[col]], axis=1)
return d
try:
# Исторические данные для каждого тикета
# скачиваем и обрабатываем отдельно
for _, row in ci.df.iterrows():
one_ticket_candles = ci.get_tinkoff_candles(row['figi'], interval)
d = concat_columns(d, one_ticket_candles)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
for col in ['open', 'high', 'low', 'close']:
d[col] = d[col].sort_index(ascending=True)
d[col] = d[col].fillna(method='ffill')
d[col] = d[col].dropna()
d[col] = d[col].apply(compute_index, axis=1)
candles = pd.DataFrame(d)
candles.index.name = 'time'
candles.reset_index(inplace=True)
ci.reset_last_candle()
return candles.to_json(orient="records")
# Текущая цена
@api.get('/api/currient_candle/{interval}')
def currient_candle(interval: int):
global ci
curr_time = ci.round_to_minutes(pd.Timestamp.now(tz='Europe/Moscow'), interval)
try:
last_prices = ci.get_tinkoff_last_prices()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
last_price = compute_index(last_prices)
# формирование свечей по последней цене
if curr_time > ci.last_candle['time']:
# Новая свеча
ci.reset_last_candle(open_price=last_price, time=curr_time)
else:
ci.last_candle['close'] = last_price
if ci.last_candle['high'] < last_price:
ci.last_candle['high'] = last_price
elif ci.last_candle['low'] > last_price:
ci.last_candle['low'] = last_price
return ci.last_candle.to_json()
Через этот микросервис поставляются данные клиенту. Основные параметры для настройки находятся в начале файла.
- Лист
tickets
— в нём записаны тикеты компании на основе которых будет рассчитываться индекс. Для примера я взял 5 самых дорогих акций из списка S&P 500. - Переменная
historical_days
— количество исторических дней для загрузки. Если дней будет слишком много, то можно получить ошибки, см. лимитную политику. - Функция
compute_index
— здесь происходит непосредственный расчёт индекса, в моём случае я просто беру среднюю цену всех акций.
В функции historical_candles
формируются исторические данные. Для их расчёта группируются все цены каждого тикета, т.е. цены open
, high
, low
и close
рассчитываются отдельно друг от друга. На выходе получаются исторические данные рассчитанного индекса.
Функция currient_candle
формирует текущую свечку графика и рассчитывает индекс исходя из текущих цен на акции.
Шаг 3. Клиент (TradingView)
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Custom fund index</title>
</head>
<body>
<h2>Custom fund index chart</h2>
<div>
<label for="one"> <input type="radio" id="one" name="interval" value="1" /> 1 minute </label>
<label for="five"> <input type="radio" id="five" name="interval" value="5" checked /> 5 minutes </label>
<label for="fifteen"> <input type="radio" id="fifteen" name="interval" value="15" /> 15 minutes </label>
</div>
<div id="custom_chart"></div>
</body>
<!-- TradingView Chart -->
<script src="https://unpkg.com/lightweight-charts/dist/lightweight-charts.standalone.production.js"></script>
<script src="index.js"></script>
</html>
Файл index.html
совсем простой, содержит в себе переключатель 'radio' для изменения интервала графика 1, 5 и 15 минут, также div
контейнер custom_chart
для самого графика. Здесь же подключены скрипты TradingView.
const log = console.log;
let interval = document.querySelector('input[name="interval"]:checked').value;
let currInterval = interval
const chartProperties = {
width: 1450,
height: 600,
timeScale: {
timeVisible: true,
secondsVisible: false,
}
};
const domElement = document.getElementById('custom_chart');
const chart = LightweightCharts.createChart(domElement, chartProperties);
const candleSeries = chart.addCandlestickSeries()
// History
function getHistory(interval) {
fetch(`http://127.0.0.1:8000/api/historical_candles/` + interval)
.then(res => res.json())
.then(json_str => JSON.parse(json_str))
.then(data => {
// log(data);
for (let i = 0; i < data.length; ++i) {
data[i].time = data[i].time / 1000 + 10800; // localize to Moscow time 60*60*3 = 10800
};
candleSeries.setData(data);
})
.catch(err => log(err))
}
getHistory(interval);
// Dynamic Chart
setInterval(function () {
currInterval = document.querySelector('input[name="interval"]:checked').value;
if (currInterval != interval) {
getHistory(currInterval);
interval = currInterval;
}
fetch(`http://127.0.0.1:8000/api/currient_candle/` + interval)
.then(res => res.json())
.then(json_str => JSON.parse(json_str))
.then(data => {
log(data);
data.time = data.time / 1000 + 10800 // localize to Moscow time 60*60*3 = 10800
candleSeries.update(data);
})
.catch(err => log(err))
}, 1000); // <-- Увеличивай интервал здесь!
В файле index.js
функция getHistory
получает исторические данные из микросервиса и отображает график. Обновление текущей цены происходит в планировщике вызова setInterval
, здесь же обновляются исторические данные, если поменять интервал свечей графика.
Шаг 4. Упаковываем в Docker контейнер
FROM python:3.8
RUN python -m pip install pandas numpy tinkoff-investments fastapi gunicorn uvicorn uvloop httptools
WORKDIR /app
ADD tinkoff-microservice.py tinkoff-microservice.py
ADD custom_index.py custom_index.py
ADD token.txt token.txt
Упакованный в Docker-контейнер микросервис очень удобно использовать и портировать.
version: '3.7'
services:
microservice:
build:
context: ./microservice
image: tinkoff-microservice
container_name: tinkoff-microservice
restart: unless-stopped
ports:
- "8000:8000"
command: gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker tinkoff-microservice:api
nginx:
image: nginx
container_name: nginx-html
restart: unless-stopped
volumes:
- ./html:/usr/share/nginx/html:ro
depends_on:
- microservice
ports:
- "8080:80"
Микросервис и клиент общаются между собой через порт 8000
, клиент доступен на localhost:8080
Примечание: для работы приложения нужен docker-compose и браузер. Чтобы запустить микросервис необходимо добавить свой токен от тиньков api в файл token.txt
, который находится в папке microservice
. Выполнить в корневой папке проекта docker-compose up --build
и открыть в браузере http://localhost:8080/
.
Заключение
Как и было отмечено в предыдущей статье, подобный индекс вполне себе рабочий инструмент. Однако есть несколько недочетов, которые не удалось решить (возможное решение это stream-соединения в тинькофф api):
- Не совсем точное отображение истории.
- Ошибки из-за превышения лимита запросов к тинькофф api.
Спасибо за внимание.
Комментарии (2)
tuxi
11.04.2022 09:15+1А пример (список тикеров и вес по каждому) состава индекса можно увидеть? А то может из пушки по воробьям стреляем?
sinneren
Не, ну раз уж вы упаковали в докер и сделали репу, то надо еще и подгрузку желаемых бумаг через конфиг\морду сделать :)