Привет, Хабр!
Когда делаешь сервисы на C++ и вокруг летает много данных, в какой-то момент понимаешь простую вещь: REST хорош для управления сущностями, но плохо подходит для потока колонок в десятки гигабайт в секунду. Переносить фреймы по сто миллионов строк через JSON и спотыкаться об сериализацию — не наш путь. В статье рассмотрим как собрать транспорт данных на Apache Arrow Flight и где встраивается ADBC, чтобы между сервисами гонять таблицы почти на скорости сети и не городить зоопарк драйверов.
Зачем вообще Arrow Flight и где здесь ADBC
Arrow — это общий колоннарный формат в памяти. Flight — это RPC-протокол/фреймворк поверх gRPC для передачи данных Arrow по сети без дополнительных сериализаций. Он дает три нужные нам операции: DoGet
для скачивания, DoPut
для загрузки и DoExchange
для двунаправленного обмена. Планирование загрузки делает GetFlightInfo
, а для долгих вычислений есть PollFlightInfo
, чтобы не блокироваться. Сервер может вернуть несколько endpoints, клиент качает их в параллель.
ADBC — сам по себе это не протокол, а унифицированный клиентский API поверх Arrow. Он дает DB-API для Python и аналоги для C/C++/Go/Java и др., где все результаты и параметры — уже Arrow-потоки, без поколоночных конвертаций туда-обратно. Часть драйверов общается с БД по их родному протоколу, но есть и драйвер Flight SQL, который бьется в любую СУБД, реализовавшую Flight SQL на сервере.
Flight SQL — это надстройка протокола Flight для SQL: подготовленные выражения, метаданные, параметры, потоковые выборки — всё в Arrow. Благодаря общему протоколу можно получить драйвера JDBC/ODBC/ADBC в нагрузку, не переписывая клиентов под каждую базу.
Что по производительности
Есть открытые бенчмарки: на типовой сети Flight показывал ~6 GB/s на DoGet
и ~4.8 GB/s на DoPut
на один поток; при масштабировании потоков throughput рос до ~10 GB/s. Это сопоставимо с сетью и близко к RDMA по пропускной способности в экспериментах на InfiniBand. Понятно, что исход зависит от схемы, компрессии и CPU, но оценку порядка дает.
Но помним: Flight гоняет Arrow IPC поток. Если включить словарную кодировку и/или ZSTD/LZ4 компрессию в IPC, можно драматично уменьшать трафик на строковых/категориальных колонках. Arrow IPC поддерживает LZ4 и ZSTD; поддержки нужно смотреть по конкретным сборкам библиотек.
Безопасность и подключение
Аутентификация в Flight делается через handshake с выдачей токена, через кастомные заголовки и middleware, либо mTLS. Удобно держать mTLS на уровне транспорта и добавлять авторизацию поверх через пер-вызовные токены, чтобы не потеряться за балансировщиками уровня 7. Клиенты/серверы понимают схемы URI grpc://
, grpc+tls://
, grpc+unix://
и специальный arrow-flight-reuse-connection://?
для повторного использования соединения.
Рассмотрим рабочие фрагменты.
Минимальный Flight-сервер на Python с TLS, DoGet/DoPut и таблицами Arrow
# pyproject.toml: pyarrow>=15
import os
import pyarrow as pa
import pyarrow.flight as fl
class TableRepo:
def __init__(self):
self._tables = {}
def put(self, name: str, table: pa.Table):
self._tables[name] = table
def get(self, name: str) -> pa.Table:
if name not in self._tables:
raise KeyError(name)
return self._tables[name]
REPO = TableRepo()
class RepoFlightServer(fl.FlightServerBase):
def __init__(self, location: fl.Location, cert_chain: bytes, private_key: bytes):
super().__init__(location, tls_cert_chain=cert_chain, tls_private_key=private_key)
# мидлварь можно навесить через конструктор; здесь опустим для краткости
# Клиент спрашивает, что есть и где качать
def get_flight_info(self, context, descriptor: fl.FlightDescriptor):
name = descriptor.path[0].decode()
table = REPO.get(name)
schema = table.schema
# один endpoint, можно раздать N endpoints для параллельного чтения
endpoints = [fl.FlightEndpoint(ticket=fl.Ticket(name.encode()), locations=[self.location])]
return fl.FlightInfo(schema, descriptor, endpoints, table.num_rows, table.nbytes)
# Клиент качает данные
def do_get(self, context, ticket: fl.Ticket):
name = ticket.ticket.decode()
table = REPO.get(name)
return fl.RecordBatchStream(table)
# Клиент заливает данные
def do_put(self, context, descriptor: fl.FlightDescriptor, reader, writer):
name = descriptor.path[0].decode()
batches = []
schema = reader.schema
for batch in reader:
batches.append(batch)
# опционально: писать промежуточный прогресс в app_metadata
writer.write_metadata(pa.py_buffer(b"ok"))
REPO.put(name, pa.Table.from_batches(batches, schema))
writer.done_writing()
def main():
# Загружаем PEM сертификаты
with open(os.environ["TLS_CERT"], "rb") as f:
cert = f.read()
with open(os.environ["TLS_KEY"], "rb") as f:
key = f.read()
server = RepoFlightServer(fl.Location.for_grpc_tls("0.0.0.0", 31337), cert, key)
server.serve() # блокирующий цикл
if __name__ == "__main__":
main()
Сервер вернул один endpoint на get_flight_info
. Для больших наборов можно возвращать несколько endpoints с разными Ticket
, которые клиенты скачивают параллельно. Это дефолт способ масштабировать чтение. Если планирование тяжелое, вместо get_flight_info
используйте PollFlightInfo
, чтобы клиент мог long-poll’ить прогресс и начинать DoGet
до завершения всего запроса.
Вопрос компрессии: RecordBatchStream
передает Arrow IPC поток. Если заранее закодировать словарные колонки, объем падает без потери семантики. Сжатие ZSTD/LZ4 поддерживается на уровне IPC буферов; проверяйте сборку Arrow в рантайме.
Клиент на Python: TLS, параллельные DoGet, сборка таблицы
import os
import concurrent.futures as fx
import pyarrow as pa
import pyarrow.flight as fl
def connect():
with open(os.getenv("TLS_ROOT"), "rb") as f:
roots = f.read()
# Безопасно: проверяем сертификат сервера, не отключаем верификацию
return fl.connect(("localhost", 31337), tls_root_certs=roots)
def fetch_all(client: fl.FlightClient, dataset: str) -> pa.Table:
desc = fl.FlightDescriptor.for_path(dataset.encode())
info = client.get_flight_info(desc)
# многопоточный DoGet со сборкой таблицы
def fetch_one(ep: fl.FlightEndpoint):
with client.do_get(ep.ticket) as rdr:
return rdr.read_all()
with fx.ThreadPoolExecutor(max_workers=len(info.endpoints)) as pool:
parts = list(pool.map(fetch_one, info.endpoints))
return pa.concat_tables(parts, promote=True)
if __name__ == "__main__":
c = connect()
# зальем тестовые данные
import pyarrow.compute as pc
t = pa.table({
"user_id": pa.array(range(1_000_000), type=pa.int64()),
"event": pc.dictionary_encode(pa.array(["click", "view", "buy"] * 333_334)[:1_000_000]),
})
c.do_put(fl.FlightDescriptor.for_path(b"events"), t.schema).write_table(t)
# заберем обратно
back = fetch_all(c, "events")
print(back.shape)
В прод коде обязательн добавляем таймауты на вызовы и размер батча под свое CPU/сеть. Для клиента pyarrow.flight.FlightClient
доступны лимиты на запись и настройки TLS, mTLS — в параметрах конструктора. Параллелизм на стороне клиента это сам по себе обязательный ингредиент, если сервер вернул много endpoints.
Отдельно убедитесь, что словари колоночных значений унифицированы между батчами, иначе писатель IPC может ругаться на dictionary replacement. Для работы со словарями есть pyarrow.compute.dictionary_encode
и опции унификации в IPC-писателях.
Flight SQL и ADBC: когда нужен SQL, а не только потоки
Flight сам по себе не знает про SQL. Для SQL есть Flight SQL: протокол описывает, как выполнять запросы, биндинг параметров через DoPut
, получать метаданные и стримить результат.
ADBC дает клиентскую сторону с привычным DB-API в Python и равнозначными API в Go/C/Java. Если на сервере включен Flight SQL, можно не писать свой клиент на чистом Flight, а взять adbc_driver_flightsql
. Пример: поднять простой Flight SQL сервер на DuckDB/SQLite есть в демонстрационном репозитории.
Python: DB-API через ADBC к Flight SQL
# pip install adbc_driver_flightsql
import os
import adbc_driver_flightsql.dbapi as dbapi
import pyarrow as pa
uri = os.getenv("FLIGHTSQL_URI", "flightsql://localhost:31338")
conn = dbapi.connect(uri) # TLS параметры тоже поддерживаются через URI/опции драйвера
with conn.cursor() as cur:
cur.execute("CREATE TEMP TABLE t AS SELECT * FROM range(0, 1000000) AS id")
# параметризованный запрос
cur.execute("SELECT id FROM t WHERE id >= ? AND id < ?", (100, 110))
# cursor.fetch_arrow_table() — сразу Arrow
tbl: pa.Table = cur.fetch_arrow_table()
print(tbl.num_rows, tbl.num_columns)
DB-API над Flight SQL позволяет передавать параметры без сборки собственных DoPut
сообщений. Сторона сервера может обновлять prepared-handle при биндинге параметров, чтобы стателесс-кластер держался попрепрочнее.
Go: ADBC к Flight SQL, чтение Arrow потоков
// go get github.com/apache/arrow-adbc/go/adbc
package main
import (
"context"
"fmt"
"log"
"github.com/apache/arrow-adbc/go/adbc"
_ "github.com/apache/arrow-adbc/go/adbc/driver/flightsql" // регистрируем драйвер
)
func main() {
var drv adbc.Driver
var err error
// flightsql://host:port?use_tls=true&tls_skip_verify=false ...
uri := "flightsql://localhost:31338"
drv, err = adbc.LoadDriver("flightsql", nil)
if err != nil {
log.Fatal(err)
}
db, err := drv.Open(uri, nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()
cn, err := db.OpenConnection(context.Background())
if err != nil {
log.Fatal(err)
}
defer cn.Close()
stmt, err := cn.NewStatement()
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
if err := stmt.SetSqlQuery("SELECT 42 AS answer"); err != nil {
log.Fatal(err)
}
rdr, _, err := stmt.ExecuteQuery(context.Background())
if err != nil {
log.Fatal(err)
}
defer rdr.Release()
for rdr.Next() {
rec := rdr.Record()
// обрабатываем arrow.Record — без промежуточных структур
fmt.Println(rec.NumRows(), rec.NumCols())
}
}
Go-драйверы ADBC и интерфейсы описаны в пакете github.com/apache/arrow-adbc/go/adbc
. За счет Arrow-потоков можно читать батчами без копий в родные структуры до последнего шага в бизнес-логике.
C++ Flight-сервер с хранением в Parquet и базовыми операциями
Если нужен производительный сервер на C++ без SQL, можно собрать хранилище на Parquet и отдавать потоки по Flight. В cookbook есть пример key-value сервиса, который принимает DoPut
и кладет Parquet, а DoGet
считывает и стримит обратно. По сути это каркас для собственного data-plane сервиса: меняете валидацию, схему, добавляете авторизацию и метрики.
// CMake: find_package(Arrow REQUIRED CONFIG COMPONENTS flight parquet)
#include <arrow/api.h>
#include <arrow/flight/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
using arrow::Status;
using arrow::flight::FlightDescriptor;
using arrow::flight::FlightEndpoint;
using arrow::flight::FlightInfo;
using arrow::flight::FlightServerBase;
using arrow::flight::Location;
using arrow::flight::Ticket;
class KvFlightServer : public FlightServerBase {
public:
explicit KvFlightServer(const std::string& data_dir) : data_dir_(data_dir) {}
Status GetFlightInfo(const arrow::flight::ServerCallContext&,
const FlightDescriptor& desc,
std::unique_ptr<FlightInfo>* info) override {
std::string key = desc.path[0];
// читаем схему из существующего файла Parquet
std::shared_ptr<arrow::Schema> schema = LoadSchema(key);
std::vector<FlightEndpoint> endpoints;
endpoints.emplace_back(Ticket(key), std::vector<Location>{location()});
*info = std::make_unique<FlightInfo>(*schema, desc, endpoints, -1, -1);
return Status::OK();
}
Status DoGet(const arrow::flight::ServerCallContext&, const Ticket& ticket,
std::unique_ptr<arrow::flight::FlightDataStream>* stream) override {
auto table = LoadParquet(ticket.ticket);
*stream = std::make_unique<arrow::flight::RecordBatchStream>(table);
return Status::OK();
}
Status DoPut(const arrow::flight::ServerCallContext&,
std::unique_ptr<arrow::flight::FlightMessageReader> reader,
std::unique_ptr<arrow::flight::FlightMetadataWriter> writer) override {
auto schema = reader->schema();
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
std::shared_ptr<arrow::RecordBatch> batch;
while (reader->ReadNext(&batch).ok() && batch) {
batches.push_back(batch);
}
auto table = arrow::Table::FromRecordBatches(schema, batches).ValueOrDie();
SaveParquet(reader->descriptor().path[0], table);
return Status::OK();
}
private:
std::string data_dir_;
// ... реализации LoadSchema/LoadParquet/SaveParquet с parquet::arrow
};
API C++-Flight серверов позволяет настраивать listen-адрес, graceful shutdown и прокидывать аутентификацию/мидлвари.
Компрессия, словари и IPC-нюансы, которые экономят гигабайты
При передаче строковых атрибутов (country
, device
, tag
) используем словарную кодировку. Для Python это одна строчка через pyarrow.compute.dictionary_encode
. Она уменьшает размер буферов до индексов, а словарь уходит один раз.
IPC-формат в режиме файла не допускает произвольной замены словаря между батчами, поэтому либо унифицируйте словари, либо используйте streaming-формат, либо добавляйте dictionary deltas там, где поддерживается.
Компрессия LZ4 хороша по скорости, ZSTD лучше по степени сжатия. Поддержка зависит от сборки Arrow и потребителей на другой стороне канала. В практической плоскости это часто дает кратное уменьшение трафика для широких таблиц с текстовыми колонками.
Где уже можно применить и как встраивать в существующие СУБД
Готовые серверные реализации Flight SQL есть в опенсорс-экосистеме: например, у ClickHouse есть интерфейс Arrow Flight и табличная функция для запроса внешних Flight-источников. Deephaven, Doris и StarRocks тоже поддерживают Flight SQL/ADBC в разных ролях.
Если ваша СУБД Flight SQL не поддерживает, ADBC все равно полезен как унифицированный клиентский слой: часть драйверов работает поверх родных протоколов, возвращая Arrow-потоки без участия Flight.
Закрываем круг
REST и JSON не исчезнут, они отличные для управления ресурсами и бизнес-операций. Но когда задача — быстро сдвигать столы с десятками колонок между сервисами и базами, лучше опираться на Arrow-экосистему.
Если подводить итог: строите data-plane на Flight, добавляете SQL там, где нужно, и прикручиваете ADBC/Flight SQL драйверы для клиентов. И получаете потоковую передачу данных, которая живет ближе к пределам вашей сети, а не к опорным точкам фреймворков.
Когда сталкиваешься с десятками гигабайт данных и сложными архитектурными решениями, быстро понимаешь: главная боль не только в передаче байтов, но и в выборе правильного подхода и инструментов. Ошибся — и упёрся в потолок производительности или в неподъёмные косты. Чтобы не тратить месяцы на проб и ошибок, лучше заранее разобраться в архитектурах и практиках, которые реально работают. За этим приходите на бесплатные уроки в Отус:
8 сентября в 20:00 — DWH, Data Lake и Data Lakehouse: архитектурные различия и практическое применение
23 сентября в 18:00 — Развертывание Spark кластера с помощью Terraform в облаке
Получить структурированные знания о подходах и инструментах можно на курсе "Data Engineer". Пройдите вступительный тест, чтобы узнать, подойдет ли вам программа курса.
А чтобы оставаться в курсе актуальных технологий и трендов, подписывайтесь на Telegram-канал OTUS.
ahdenchik
Как это вы волшебным образом превратите выражения SQL для одной БД в выражения SQL для другой БД?