Для будущих студентов курса "Golang Developer. Professional" и всех интересующихся подготовили перевод интересного материала.

Также приглашаем на
открытый вебинар по теме «Go-каналы снаружи и внутри». Каналы в Go — это одна из основ и особенностей языка. Понимание того, как они устроены — ключ к правильному и оптимальному использованию. Присоединяйтесь, чтобы обсудить тему.


Введение

В основном я предпочитаю использовать реляционные базы данных (SQL), поскольку они предоставляют несколько возможностей, которые весьма полезны при работе с данными. SQLite - отличный выбор, так как база данных там представляет собой единый файл, что упрощает обмен данными. Несмотря на то, что это единый файл, SQLite может обрабатывать до 281 терабайта данных. SQLite также поставляется с клиентом командной строки sqlite3, который отлично подходит для быстрого прототипирования.

Примечание: В других базах данных есть транзакции, языки запросов и схемы. Однако базы данных на основе SQL имеют тенденцию быть более развитыми испытанными временем, а сам SQL стандартизирован.

Определения

Начнем с нескольких определений, которые помогут нам внести ясность.

SQLite: это встроенная база данных SQL. SQLite — небольшая, быстрая и самая часто используемая база данных. Это также, безусловно, мой излюбленный способ обмена большими объемами данных между двумя программами.

Транзакции: вы вводите данные в базу данных SQL внутри транзакции. Это означает, что либо поступают сразу все данные, либо не поступают никакие. Транзакции на порядки упрощают код повторного выполнения операций в конвейерах данных.

Схема: данные в реляционных базах данных имеют схему, что означает, что их легче проверить на достоверность.

SQL: Structured Query Language (язык структурированных запросов) — это язык для выборки и изменения данных. Вам не нужно изобретать еще один способ выборки интересующих вас частей данных. SQL — это устоявшийся формат, вокруг которого накоплено много знаний и инструментов.

Проект

Мы напишем HTTP-сервер на Go, который будет получать уведомления о сделках (trades) и хранить их в базе данных SQLite. Затем мы напишем программу на Python, которая будет обрабатывать эти данные.

В Go мы будем использовать github.com/mattn/go-sqlite3, который является оболочкой для библиотеки C SQLite.

Примечание: Поскольку go-sqlite использует cgo, изначальное время сборки будет больше обычного. Использование cgo означает, что результирующий исполняемый файл зависит от некоторых разделяемых библиотек из ОС, что немного усложняет распространение.

В Python мы будем использовать встроенный модуль sqlite3 и функцию read_sql Pandas, чтобы загружать данные. Исходный код из этой статьи доступен по ссылке.

Код на Go

Код, который я собираюсь вам показать, можно найти в файле trades.go.

Листинг 1: Структура Trade

37 // Trade - это сделка о покупке/продаже по символу ценной бумаги.
38 type Trade struct {
39     Time   time.Time
40     Symbol string
41     Price  float64
42     IsBuy  bool
43 }

В Листинге 1 показана структура данных Trade. У нее есть поле Time, отражающее время сделки, поле Symbol, хранящее биржевое сокращение (символ акции, например, AAPL), поле Price, содержащее цену, и логический флаг, который сообщает, это сделка покупки или продажи.

Листинг 2: Схема базы данных

24     schemaSQL = `
25 CREATE TABLE IF NOT EXISTS trades (
26     time TIMESTAMP,
27     symbol VARCHAR(32),
28     price FLOAT,
29     buy BOOLEAN
30 );
31 
32 CREATE INDEX IF NOT EXISTS trades_time ON trades(time);
33 CREATE INDEX IF NOT EXISTS trades_symbol ON trades(symbol);
34 `

В Листинге 2 объявляется схема базы данных, соответствующая структуре Trade. В строке 25 мы создаем таблицу под названием trades. В строках 26-29 мы определяем столбцы таблицы, которые соответствуют полям структуры Trade. В строках 32-33 мы создаем индексы для таблицы, чтобы обеспечить быстрое запрашивание по time и symbol.

Листинг 3: Внесение записи

16     insertSQL = `
17 INSERT INTO trades (
18     time, symbol, price, buy
19 ) VALUES (
20     ?, ?, ?, ?
21 )
22 `

Листинг 3 определяет SQL-запрос для внесения записи в базу данных. В строке 20 мы используем символ-заполнитель ? для параметров этого запроса. Никогда не используйте fmt.Sprintf для создания SQL-запроса — вы рискуете создать SQL-инъекцию.

Вставка записей по одной за раз — медленный процесс. Мы собираемся хранить записи в буфере, и, когда он достаточно заполнится, внесем все записи из буфера в базу данных. Преимущество этого подхода заключается в скорости, но он сопряжен с риском потери данных в случае сбоя на сервере.

Листинг 4: DB

45 // DB - это база данных биржевых торгов.
46 type DB struct {
47     sql    *sql.DB
48     stmt   *sql.Stmt
49     buffer []Trade
50 }

В Листинге 4 описана структура DB. В строке 47 мы устанавливаем соединение с базой данных. В строке 48 мы храним подготовленный (предварительно скомпилированный) оператор для вставки, а в строке 49 находится наш буфер ожидающих транзакций в памяти.

Листинг 5: NewDB

53 // NewDB создает Trades для управления торговлей акциями в базе данных SQLite. 
Этот API не является потокобезопасным.
54 func NewDB(dbFile string) (*DB, error) {
55     sqlDB, err := sql.Open("sqlite3", dbFile)
56     if err != nil {
57         return nil, err
58     }
59 
60     if _, err = sqlDB.Exec(schemaSQL); err != nil {
61         return nil, err
62     }
63 
64     stmt, err := sqlDB.Prepare(insertSQL)
65     if err != nil {
66         return nil, err
67     }
68 
69     db := DB{
70         sql:    sqlDB,
71         stmt:   stmt,
72         buffer: make([]Trade, 0, 1024),
73     }
74     return &db, nil
75 }

Листинг 5 демонстрирует создание готовой к использованию базы данных DB. В строке 55 мы подключаемся к базе данных с помощью драйвера “sqlite3”. В строке 60 мы применяем SQL-схему, чтобы создать таблицу trades, если она еще не существует. В строке 64 мы предварительно компилируем инструкцию InsertSQL. В строке 72 мы создаем внутренний буфер с длиной 0 и емкостью 1024.

Примечание: Чтобы не усложнять код, предоставляемый мной API DB не горутино-безопасен (в отличие от sql.DB). Если несколько горутин вызывают API одновременно, вы столкнетесь с состоянием гонки. Я оставлю это вам в качестве упражнения — сделайте этот код горутино-безопасным.

Листинг 6: Add

77 // Add сохраняет сделку в буфер. Как только буфер заполняется, 
сделки вносятся в базу данных.
79 func (db *DB) Add(trade Trade) error {
80     if len(db.buffer) == cap(db.buffer) {
81         return errors.New("trades buffer is full")
82     }
83 
84     db.buffer = append(db.buffer, trade)
85     if len(db.buffer) == cap(db.buffer) {
86         if err := db.Flush(); err != nil {
87             return fmt.Errorf("unable to flush trades: %w", err)
88         }
89     }
90 
91     return nil
92 }

В Листинге 6 приведен метод Add. В строке 84 мы добавляем сделку (trade) в буфер в памяти. В строке 85 мы проверяем, заполнен ли буфер, и если да, то мы вызываем метод Flush в строке 86, который вносит записи из буфера в базу данных.

Листинг 7: Flush

94  // Flush вносит ждущие обработки сделки в базу данных.
95  func (db *DB) Flush() error {
96      tx, err := db.sql.Begin()
97      if err != nil {
98          return err
99      }
100 
101     for _, trade := range db.buffer {
102         _, err := tx.Stmt(db.stmt).Exec(trade.Time, trade.Symbol, trade.Price, trade.IsBuy)
103         if err != nil {
104             tx.Rollback()
105             return err
106         }
107     }
108 
109     db.buffer = db.buffer[:0]
110     return tx.Commit()
111 }

В Листинге 7 приведен метод Flush. В строке 96 мы начинаем транзакцию. В строке 101 мы итерируем по внутреннему буферу, а в строке 102 вносим каждую сделку. Если при внесении произошла ошибка, мы выполняем rollback в строке 104. В строке 109 мы сбрасываем буфер сделок в памяти. И наконец, в строке 110 мы выполняем commit.

Листинг 8: Close

113 // Close вносит (посредством Flush) все сделки в базу данных и предотвращает любую торговлю в будущем.
114 func (db *DB) Close() error {
115     defer func() {
116         db.stmt.Close()
117         db.sql.Close()
118     }()
119 
120     if err := db.Flush(); err != nil {
121         return err
122     }
123 
124     return nil
125 }

В Листинге 8 приведен метод Close. В строке 120 мы вызываем Flush, чтобы внести все оставшиеся сделки в базу данных. В строках 116 и 117 мы закрываем (close) инструкцию и базу данных. Функции, создающие DB, должны иметь функцию defer db.Close(), чтобы убедиться, что связь с базой данных закончена.

Листинг 9: Импорты

5 // Ваши пакеты main и test требуют эти импорты, чтобы пакет sql был правильно инициализирован.
6 // _ "github.com/mattn/go-sqlite3"
7 
8 import (
9     "database/sql"
10     "errors"
11     "fmt"
12     "time"
13 )

В Листинге 9 приведен импорт для файла. В строке 5 мы импортируем database/sql, которая определяет API для работы с базами данных SQL. database/sql не содержит какого-либо конкретного драйвера базы данных.

Как говорится в комментарии к оператору импорта, чтобы использовать пакет trades, вам необходимо импортировать пакет, который реализует драйвер базы данных sqlite3 (например, github.com/mattn/go-sqlite3). Поскольку вы импортируете пакет, реализующий драйвер только для небольшого изменения регистрации протокола “sqlite3”, мы используем перед импортом, сообщая компилятору Go, что то, что мы не используем этот пакет в коде — это нормально.

Листинг 10: Пример использования

Код этих примеров можно найти в файле tradestest.go.

66 func ExampleDB() {
67     dbFile := "/tmp/db-test" + time.Now().Format(time.RFC3339)
68     db, err := trades.NewDB(dbFile)
69     if err != nil {
70         fmt.Println("ERROR: create -", err)
71         return
72     }
73     defer db.Close()
74 
75     const count = 10_000
76     for i := 0; i < count; i++ {
77         trade := trades.Trade{
78             Time:   time.Now(),
79             Symbol: "AAPL",
80             Price:  rand.Float64() * 200,
81             IsBuy:  i%2 == 0,
82         }
83         if err := db.Add(trade); err != nil {
84             fmt.Println("ERROR: insert - ", err)
85             return
86         }
87     }
88 
89     fmt.Printf("inserted %d records\n", count)
90     // Вывод:
91     // inserted 10000 records
92 }

В Листинге 10 показан пример использования (в виде тестируемого примера). В строке 67 мы создаем новую базу данных, а в строке 73 мы закрываем ее с помощью оператора defer. В строке 76 мы запускаем цикл для вставки сделок, а в строке 83 мы собственно и вставляем сделку в базу данных.

Код на Python

Примеры кода на Python можно найти в файле analysis_trades.py.

Листинг 11: Импорты

02 import sqlite3
03 from contextlib import closing
04 from datetime import datetime
05 
06 import pandas as pd

В Листинге 11 показаны библиотеки, которые мы используем в нашем Python-коде. В строке 2 мы импортируем встроенный модуль sqlite3, а в строке 6 — библиотеку pandas.

Листинг 12: Select SQL

08 select_sql = """
09 SELECT * FROM trades
10 WHERE time >= ? AND time <= ?
11 """

В Листинге 12 показан SQL-запрос для получения данных. В строке 10 мы выбираем все столбцы из таблицы trades. В строке 10 мы добавляем элемент WHERE для выбора временного диапазона. Как и в Go-коде мы используем заполнители аргументов ? и не пишем SQL-запросы вручную.

Листинг 13: Загрузка сделок

14 def load_trades(db_file, start_time, end_time):
15     """Загружаем сделки из db_file за заданный временной диапазон."""
16     conn = sqlite3.connect(db_file)
17     with closing(conn) as db:
18         df = pd.read_sql(select_sql, db, params=(start_time, end_time))
19 
20     # Мы не можем использовать здесь detect_types=sqlite3.PARSE_DECLTYPES, поскольку Go вставляет часовой пояс, а sqlite3 Python не умеет обрабатывать его.
22     # Смотрите https://bugs.python.org/issue29099# См. Https://bugs.python.org/issue29099
23     df["time"] = pd.to_datetime(df["time"])
24     return df

В Листинге 13 показан код для загрузки сделок из заданного временного диапазона. В строке 16 мы подключаемся к базе данных. В строке 17 мы используем менеджер контекста, что-то вроде defer в Go, чтобы убедиться, что база данных закрыта. В строке 18 мы используем функцию pandas read_sql для загрузки данных из SQL-запроса в DataFrame. Python имеет API для подключения к базам данных (например, database/sql), а Pandas может использовать любой совместимый драйвер. В строке 23 мы конвертируем столбец time в Timestamp pandas. Это особенность SQLite, в котором нет встроенной поддержки TIMESTAMP типов.

Листинг 14: Средняя цена

27 def average_price(df):
28     """Возвращает среднюю цену в df, сгруппированную по (stock, buy)"""
29     return df.groupby(["symbol", "buy"], as_index=False)["price"].mean()

В Листинге 14 показано, как вычислить среднюю цену на symbol и buy. В строке 29 мы используем DataFrame groupby для группировки по symbol и buy. Мы используем as_index=False, чтобы получить symbol и buy в виде столбцов в итоговом фрейме данных. Затем мы берем столбец price и вычисляем среднее значение для каждой группы.

Листинг 15: Вывод

symbol,buy,price
AAPL,0,250.82925665004535
AAPL,1,248.28277375538832
GOOG,0,250.11537993385295
GOOG,1,252.4726772487683
MSFT,0,250.9214212695317
MSFT,1,248.60187022941685
NVDA,0,250.3844763417279
NVDA,1,249.3578146208962

В Листинге 15 показан результат выполнения кода Python на тестовых данных.

Заключение

Я настоятельно рекомендую вам рассмотреть возможность использования SQLite в вашем следующем проекте. SQLite — это испытанная временем стабильная платформа, способная обрабатывать огромные объемы данных. Многие языки программирования имеют драйверы для SQLite, что делает его хорошим вариантом хранения.

Я максимально упростил код, чтобы показать наиболее интересные моменты. Есть несколько мест, где вы можете попробовать улучшить его:

  • Добавить код повторного выполнения в Flush

  • Выполнить дополнительную проверку ошибок в Close

  • Сделать DB горутино-безопасным

  • Реализовать больше аналитики в Python-части

Я также рекомендую вам изучить SQL, так как очень много данных хранятся в базах данных SQL. Вам не нужно быть экспертом, знание базовых вариантов select уже даст вам новый мощный инструмент в вашем инструментарии анализа данных.

Если вы не хотите напрямую работать с SQL, есть несколько альтернатив, в мире Go есть такие пакеты, как sqlx, gorm, а в мире Python есть SQLAlchemy, который может использовать Pandas.

Приятного вам кодинга. Дайте мне знать, как вы используете SQL в целом и SQLite в частности.


Узнать подробнее о курсе "Golang Developer. Professional".

Смотреть открытый вебинар по теме «Go-каналы снаружи и внутри».