Здравствуйте дорогие хабровчане, в этом посте я покажу, как написать свой биржевой индекс наподобие S&P 500 или Nasdaq.


О том, как мне это пришло в голову можно прочитать в моей предыдущей статье: Как я решил стать трейдером и проигрался, а потом отыгрался, потому, что я программист. Мой опыт. Здесь будет рассмотрена только техническая сторона.


В силу того, что прошлая статья получила большой отклик у хабровчян, я решил сделать техническое описание кода приложения.


Не работаю в Тинькофф и мне не заплатили за рекламу. Конечно, была попытка добавить эту статью в корпоративный блог Тинькова хоть за символическую плату, но не тут то было. Меня просто месяц динамили, а потом сказали, дескать блог только исключительно для сотрудников компании (должно быть это месть за критику службы поддержки в прошлой статье).



В предыдущей статье было описано, как мне удалось рассчитать и использовать индекс чистой стоимости фонда и как я к этому пришёл. Этот индекс был написан на старом api, на тот момент единственном. Из-за этого возникло множество проблем, главная из которых большая задержка в обновлении данных.


Чтобы уменьшить эту задержку (которая в итоге составила 5,5 секунд!), мне пришлось использовать два токена доступа, которые чередовались для каждого запроса. Также я вынужден был выбрать строго ограниченное количество компаний, для которых проводился расчёт. Бутылочным горлышком являлось отсутствие возможности получать данные по нескольким компаниям в одном запросе. Например, чтобы получить текущие цены акций десяти компаний нужно было десять запросов. Если не укладывался в лимит, а количество запросов было ограничено в минуту, api возвращал ошибки на последующие обращения.


Однако, в начале 2022 года тинькофф выкатил новый api, в котором эти проблемы были решены. Теперь можно в одном запросе получать все текущие цены на акции.


Сам микросервис написан на Python 3 и упакован в Docker-контейнер, клиент — на JavaScript, график TradingView. Микросервис и клиент общаются при помощи api реализованном на FastApi.


Репозиторий с исходным кодом — на GitHub.



Шаг 1. Получаю данные из Тинькофф api


custom_index.py:


Показать...
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
import sys

from tinkoff.invest import Client, CandleInterval

import warnings
warnings.filterwarnings("ignore", 'This pattern has match groups')

class CustomIndex:

    def __init__(self, tickets, historical_days=2, token='token.txt'):
        self.logger = self.__create_logger()

        self.historical_days = historical_days
        self.reset_last_candle()

        try:
            with open(token, 'r') as file:
                self.__token = file.read().rstrip()
        except Exception as e:
            self.logger.exception(e)
            raise Exception('--> Ошибка в файле token.txt: '+ str(e))

        try:
            with Client(self.__token) as client:
                shares = client.instruments.shares() 
        except Exception as e:
            self.logger.exception(e)
            raise Exception('--> tinkoff api - Ошибка загрузки данных обо всех акциях.')

        shares = pd.DataFrame(shares.instruments)
        self.df = shares[shares['ticker'].isin(tickets)]

    def reset_last_candle(self, open_price=0.0, time=pd.Timestamp(0, tz='Europe/Moscow')):
        self.last_candle = pd.Series({'open': open_price, 
                                      'high': open_price, 
                                      'low': open_price, 
                                      'close': open_price, 
                                      'time': time})

    def round_to_minutes(self, t, interval=5):
        delta = pd.Timedelta(minutes=t.minute % interval, 
                            seconds=t.second, 
                            microseconds=t.microsecond)
        t -= delta

        return t

    def __units_nano_convert(self, d):
        price = '{}.{}'.format(d['units'], d['nano'])
        price = float(price)

        return price

    def __create_logger(self):
        logger = logging.getLogger()
        logger.setLevel(logging.ERROR)

        formatter = logging.Formatter('--> %(asctime)s - %(name)s - %(levelname)s - %(message)s')

        sh = logging.StreamHandler(sys.stdout)
        sh.setFormatter(formatter)    
        logger.addHandler(sh)

        # send logs in docker logs
        fh = logging.FileHandler('/proc/1/fd/1')
        fh.setFormatter(formatter)    
        logger.addHandler(fh)

        return logger

    def get_tinkoff_candles(self, figi, interval):
        if interval == 1:
            interval = CandleInterval.CANDLE_INTERVAL_1_MIN
        elif interval == 5:
            interval = CandleInterval.CANDLE_INTERVAL_5_MIN
        elif interval == 15:
            interval = CandleInterval.CANDLE_INTERVAL_15_MIN
        else:
            interval = -1

        curr_time = datetime.now()
        data = []       

        for day in range(self.historical_days):
            try:
                with Client(self.__token) as client:
                    data += client.market_data.get_candles(
                        figi=figi,
                        from_=curr_time - timedelta(days=day+1),
                        to=curr_time - timedelta(days=day),
                        interval=interval
                    ).candles
            except Exception as e:
                self.logger.exception(e)
                raise Exception('--> tinkoff api - history - Ошибка загрузки исторических данных.')

        candles = pd.DataFrame(data)

        for col in ['open', 'high', 'low', 'close']:
            candles[col] = candles[col].apply(self.__units_nano_convert)

        candles = candles[['time', 'open', 'high', 'low', 'close']]

        candles['time'] = candles['time'].dt.tz_convert('Europe/Moscow')
        candles.set_index('time', inplace=True)

        candles = candles.drop_duplicates()

        return candles

    def get_tinkoff_last_prices(self):
        try:
            with Client(self.__token) as client:
                last_prices = client.market_data.get_last_prices(figi=self.df['figi'].to_list())
        except Exception as e:
            self.logger.exception(e)
            raise Exception('--> tinkoff api - last price - Ошибка загрузки последней цены.')

        last_prices = pd.DataFrame(last_prices.last_prices)

        last_prices = last_prices['price'].apply(self.__units_nano_convert)

        return last_prices

У каждой компании, торгующейся на бирже, кроме названия, есть свой тикет. Однако, для того, чтобы получить данные по компании необходимо знать её figi код. Чтобы его получить необходимо сопоставить тикет и figi код. Это происходит в конструкторе класса CustomIndex. В нём скачиваются figi для всех акций и сопоставляются с нужными мне тикетами. Здесь нет ничего сложного.


Метод get_tinkoff_candles получает исторические данные по figi коду. Исходя из ограничений api на интервалах 1, 5 и 15 минут можно получить только один день. Так же было и в старом api. То есть, чем больше исторических данных мне нужно, тем больше нужно запросов (ограничение запросов в минуту никуда не делось). Любопытно было бы узнать, из каких соображений данные ограничили одним днём.


Ну и главное отличие от старого api я с удовольствием использовал в методе get_tinkoff_last_prices. В нём обновляются все последние цены для всех акций за один запрос.


Шаг 2. Микросервис (FastApi)


tinkoff-microservice.py:


Показать...
import pandas as pd
import numpy as np
from collections import defaultdict
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware

from custom_index import CustomIndex

# Топ 5 самых дорогих акций из S&P 500
tickets = ['NVR', 'AMZN', 'GOOG', 'GOOGL', 'BKNG']

# Количество исторических дней для отображения
historical_days = 4

# Расчёт индекса
def compute_index(prices):
    mean_price = np.mean(prices)

    return mean_price

api = FastAPI()

api.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

ci = CustomIndex(tickets, historical_days=historical_days, token='token.txt')

# Исторические данные
@api.get('/api/historical_candles/{interval}')
def historical_candles(interval: int):
    global ci

    # Средняя цена
    d = defaultdict(pd.DataFrame)

    # Собираем данные, все open от всех тикетов в один 
    # датафрейм, high в другой и т.д.
    def concat_columns(d, one_ticket_candles):
        for col in ['open', 'high', 'low', 'close']:
            d[col] = pd.concat([d[col], one_ticket_candles[col]], axis=1)

        return d        

    try:
        # Исторические данные для каждого тикета
        # скачиваем и обрабатываем отдельно
        for _, row in ci.df.iterrows():
            one_ticket_candles = ci.get_tinkoff_candles(row['figi'], interval)
            d = concat_columns(d, one_ticket_candles)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

    for col in ['open', 'high', 'low', 'close']:
        d[col] = d[col].sort_index(ascending=True)
        d[col] = d[col].fillna(method='ffill')
        d[col] = d[col].dropna()

        d[col] = d[col].apply(compute_index, axis=1)

    candles = pd.DataFrame(d)

    candles.index.name = 'time'
    candles.reset_index(inplace=True)

    ci.reset_last_candle()

    return candles.to_json(orient="records")

# Текущая цена
@api.get('/api/currient_candle/{interval}')
def currient_candle(interval: int):
    global ci

    curr_time = ci.round_to_minutes(pd.Timestamp.now(tz='Europe/Moscow'), interval)

    try:
        last_prices = ci.get_tinkoff_last_prices()
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

    last_price = compute_index(last_prices)

    # формирование свечей по последней цене
    if curr_time > ci.last_candle['time']:
        # Новая свеча
        ci.reset_last_candle(open_price=last_price, time=curr_time)
    else:
        ci.last_candle['close'] = last_price

        if ci.last_candle['high'] < last_price:
            ci.last_candle['high'] = last_price

        elif ci.last_candle['low'] > last_price:
            ci.last_candle['low'] = last_price

    return ci.last_candle.to_json()

Через этот микросервис поставляются данные клиенту. Основные параметры для настройки находятся в начале файла.


  • Лист tickets — в нём записаны тикеты компании на основе которых будет рассчитываться индекс. Для примера я взял 5 самых дорогих акций из списка S&P 500.
  • Переменная historical_days — количество исторических дней для загрузки. Если дней будет слишком много, то можно получить ошибки, см. лимитную политику.
  • Функция compute_index — здесь происходит непосредственный расчёт индекса, в моём случае я просто беру среднюю цену всех акций.

В функции historical_candles формируются исторические данные. Для их расчёта группируются все цены каждого тикета, т.е. цены open, high, low и close рассчитываются отдельно друг от друга. На выходе получаются исторические данные рассчитанного индекса.


Функция currient_candle формирует текущую свечку графика и рассчитывает индекс исходя из текущих цен на акции.


Шаг 3. Клиент (TradingView)


index.html:


Показать...
<!DOCTYPE html>
<html>

<head>
  <meta charset="utf-8">
  <title>Custom fund index</title>
</head>

<body>
  <h2>Custom fund index chart</h2>

  <div>
    <label for="one"> <input type="radio" id="one" name="interval" value="1" /> 1 minute </label>
    <label for="five"> <input type="radio" id="five" name="interval" value="5" checked /> 5 minutes </label>
    <label for="fifteen"> <input type="radio" id="fifteen" name="interval" value="15" /> 15 minutes </label>
  </div>

  <div id="custom_chart"></div>
</body>

<!-- TradingView Chart -->
<script src="https://unpkg.com/lightweight-charts/dist/lightweight-charts.standalone.production.js"></script>

<script src="index.js"></script>

</html>

Файл index.html совсем простой, содержит в себе переключатель 'radio' для изменения интервала графика 1, 5 и 15 минут, также div контейнер custom_chart для самого графика. Здесь же подключены скрипты TradingView.


index.js:


Показать...
const log = console.log;

let interval = document.querySelector('input[name="interval"]:checked').value;
let currInterval = interval

const chartProperties = {
  width: 1450,
  height: 600,
  timeScale: {
    timeVisible: true,
    secondsVisible: false,
  }
};

const domElement = document.getElementById('custom_chart');
const chart = LightweightCharts.createChart(domElement, chartProperties);
const candleSeries = chart.addCandlestickSeries()

// History
function getHistory(interval) {
  fetch(`http://127.0.0.1:8000/api/historical_candles/` + interval)
    .then(res => res.json())
    .then(json_str => JSON.parse(json_str))
    .then(data => {
      // log(data);

      for (let i = 0; i < data.length; ++i) {
        data[i].time = data[i].time / 1000 + 10800; // localize to Moscow time 60*60*3 = 10800 
      };

      candleSeries.setData(data);
    })
    .catch(err => log(err))
}

getHistory(interval);

// Dynamic Chart
setInterval(function () {
  currInterval = document.querySelector('input[name="interval"]:checked').value;

  if (currInterval != interval) {
    getHistory(currInterval);
    interval = currInterval;
  }

  fetch(`http://127.0.0.1:8000/api/currient_candle/` + interval)
    .then(res => res.json())
    .then(json_str => JSON.parse(json_str))
    .then(data => {

      log(data);

      data.time = data.time / 1000 + 10800 // localize to Moscow time 60*60*3 = 10800

      candleSeries.update(data);
    })
    .catch(err => log(err))
}, 1000); // <-- Увеличивай интервал здесь!

В файле index.js функция getHistory получает исторические данные из микросервиса и отображает график. Обновление текущей цены происходит в планировщике вызова setInterval, здесь же обновляются исторические данные, если поменять интервал свечей графика.


Шаг 4. Упаковываем в Docker контейнер


Dockerfile:


Показать...
FROM python:3.8

RUN python -m pip install pandas numpy tinkoff-investments fastapi gunicorn uvicorn uvloop httptools

WORKDIR /app

ADD tinkoff-microservice.py tinkoff-microservice.py
ADD custom_index.py custom_index.py
ADD token.txt token.txt

Упакованный в Docker-контейнер микросервис очень удобно использовать и портировать.


docker-compose.yml:


Показать...
version: '3.7'

services:
    microservice:
        build:
            context: ./microservice
        image: tinkoff-microservice
        container_name: tinkoff-microservice
        restart: unless-stopped
        ports:
            - "8000:8000"
        command: gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker tinkoff-microservice:api

    nginx:
        image: nginx
        container_name: nginx-html
        restart: unless-stopped
        volumes:
            - ./html:/usr/share/nginx/html:ro
        depends_on:
            - microservice
        ports:
            - "8080:80"

Микросервис и клиент общаются между собой через порт 8000, клиент доступен на localhost:8080



Примечание: для работы приложения нужен docker-compose и браузер. Чтобы запустить микросервис необходимо добавить свой токен от тиньков api в файл token.txt, который находится в папке microservice. Выполнить в корневой папке проекта docker-compose up --build и открыть в браузере http://localhost:8080/.


Заключение


Как и было отмечено в предыдущей статье, подобный индекс вполне себе рабочий инструмент. Однако есть несколько недочетов, которые не удалось решить (возможное решение это stream-соединения в тинькофф api):


  • Не совсем точное отображение истории.
  • Ошибки из-за превышения лимита запросов к тинькофф api.

Спасибо за внимание.

Комментарии (2)


  1. sinneren
    11.04.2022 09:12
    +1

    Не, ну раз уж вы упаковали в докер и сделали репу, то надо еще и подгрузку желаемых бумаг через конфиг\морду сделать :)


  1. tuxi
    11.04.2022 09:15
    +1

    А пример (список тикеров и вес по каждому) состава индекса можно увидеть? А то может из пушки по воробьям стреляем?