Привет, Хабр! Меня зовут Алексей, и я тот самый программист, который до недавнего времени скептически относился к ИИ. «Очередная мода», — думал я. Но время не стоит на месте, и сейчас я активно изучаю ИИ как со стороны пользователя, так и с позиции разработчика.
Особенно интересной стала задача интеграции нашей внутренней системы управления задачами с ИИ. Типовое решение — использование векторной базы (RAG) в качестве промежуточного хранилища. Саму задачу я стал решать в режиме Vibe Coding (но об этом стоит написать отдельный пост).
Весной команда Yandex DB анонсировала поддержку векторных операций, а на недавней конференции Yandex Neuro Scale упоминалось, что теперь YDB можно использовать в качестве RAG. Но вот незадача — я нигде не нашел end-to-end примера реализации. Пришлось разбираться самостоятельно.
Что у нас получилось
Реализовал класс на JavaScript, который позволяет:
Сохранять документы с векторными представлениями в YDB
Выполнять семантический поиск по векторам
Работать с batch-операциями для эффективной вставки
Для генерации эмбеддингов используется библиотека @xenova/transformers - но можно легко поменять на что-то другое.
Код решения
// ydb-vector-store.js
import pkg from 'ydb-sdk';
const { Driver, TypedValues, Types } = pkg;
import { pipeline } from '@xenova/transformers';
//
// VectorStore implementation used Yandex DB as a storage
//
export class YDBVectorStore {
// major options:
// * endpoint - endpoint to connect to database
// * database - path to database
// * authService - class restonsible for authentication
// Optional options:
// * tableName
// * vectorDimensions
// * batchSize
//
constructor(options = {}) {
this.endpoint = options.endpoint;
this.database = options.database;
this.authService = options.authService;
this.driver = null;
this.embedder = null;
this.initialized = false;
this.settings = {
tableName: options.tableName || 'vector_db',
vectorDimensions: options.vectorDimensions || 384,
batchSize: options.batchSize || 25,
...options
};
console.log('?️ YDB Vector Store configured:');
console.log(' - Table:', this.settings.tableName);
console.log(' - Dimensions:', this.settings.vectorDimensions);
console.log(' - Endpoint:', this.endpoint);
console.log(' - Database:', this.database);
}
// initialize vector database
async initialize() {
if (this.initialized) return;
console.log('? Initializing YDB Vector Store...');
try {
// 1. Initialize YDB Driver
this.driver = new Driver({
endpoint: this.endpoint,
database: this.database,
authService: this.authService
});
if (!await this.driver.ready(10000)) {
throw new Error('YDB driver failed to initialize within 10 seconds');
}
console.log('✅ YDB driver connected successfully');
// 2. Loading embedding model
console.log('? Loading embedding model...');
this.embedder = await pipeline('feature-extraction', 'Xenova/all-MiniLM-L6-v2');
// 3. Create table (if not exists)
await this.createTable();
this.initialized = true;
console.log('✅ YDB Vector Store initialized');
} catch (error) {
console.error('❌ Error initializing YDB Vector Store:', error.message);
if (this.driver) {
await this.driver.destroy();
}
throw error;
}
}
// Cretae table (if not exists)
async createTable() {
const query = `CREATE TABLE IF NOT EXISTS ${this.settings.tableName} (
id Serial,
document Text,
embedding String,
metadata Json,
created_at Timestamp,
PRIMARY KEY (id)
)`;
console.log(`Create Table ${this.settings.tableName}`);
await this.driver.queryClient.do({
fn: async (session) => {
await session.execute({
text: query
});
},
});
console.log(`Table ${this.settings.tableName} created`);
}
// Add documents to the database
async addDocumentsBatch(documents, metadatas = null) {
if (!this.initialized) await this.initialize();
console.log('? Adding ' + documents.length + ' documents to YDB...');
let processed = 0;
const batchSize = this.settings.batchSize;
for (let i = 0; i < documents.length; i += batchSize) {
const batchDocs = documents.slice(i, i + batchSize);
const batchMetadatas = metadatas ? metadatas.slice(i, i + batchSize) : null;
await this.addDocumentsBatchInternal(batchDocs, batchMetadatas);
processed += batchDocs.length;
console.log('? Progress: ' + processed + '/' + documents.length + ' documents');
// Add pause between batches
if (i + batchSize < documents.length) {
await new Promise(function(resolve) { setTimeout(resolve, 100); });
}
}
console.log('✅ Added ' + documents.length + ' documents to YDB');
}
// internal implementation of adding documents
async addDocumentsBatchInternal(documents, metadatas = null) {
if (documents.length === 0) return;
try {
// Generate embeddings for all documents
const embeddings = await Promise.all(
documents.map(doc => this.generateEmbedding(doc))
);
// Prepare values for batch insert
const values = documents.map((_, index) =>
`($document${index}, Untag(Knn::ToBinaryStringFloat($embedding${index}), "FloatVector"), $metadata${index}, CurrentUtcTimestamp())`
).join(', ');
// Create DECLARE for all params
const declarations = documents.flatMap((_, index) => [
`DECLARE $document${index} AS Text`,
`DECLARE $embedding${index} AS List<Float>`,
`DECLARE $metadata${index} AS Json`
]).join(';\n');
const query = `
${declarations};
INSERT INTO ${this.settings.tableName} (document, embedding, metadata, created_at)
VALUES ${values}
`;
// Prepare params for all documents
const params = {};
for (let i = 0; i < documents.length; i++) {
const document = documents[i];
const metadata = metadatas ? metadatas[i] : {};
const embedding = embeddings[i];
params[`$document${i}`] = TypedValues.text(document);
params[`$embedding${i}`] = TypedValues.list(Types.FLOAT, embedding);
params[`$metadata${i}`] = TypedValues.json(JSON.stringify(metadata));
}
// Execute one batch request
//console.log("Query: " + query);
await this.driver.tableClient.withSession(async (session) => {
await session.executeQuery(query, params);
});
console.log(`✅ Successfully inserted ${documents.length} documents in batch`);
} catch (error) {
console.error('❌ Error in batch operation:', error.message);
throw error;
}
}
// generate embeddings
async generateEmbedding(text) {
if (!this.initialized) await this.initialize();
const output = await this.embedder(text, {
pooling: 'mean',
normalize: true
});
return Array.from(output.data);
}
// perform search in Database
async search(query, nResults = 5) {
if (!this.initialized) await this.initialize();
console.log('? Searching in YDB for: "' + query.substring(0, 50) + '..."');
try {
const queryEmbedding = await this.generateEmbedding(query);
const topResults = await this.searchByEmbedding(queryEmbedding, nResults);
console.log('✅ Found ' + topResults.length + ' relevant results');
return {
documents: topResults.map(function(r) { return r.document; }),
metadatas: topResults.map(function(r) { return r.metadata; })
};
} catch (error) {
console.error('❌ Error during search:', error);
throw error;
}
}
// internal implementation of searching by embedding
async searchByEmbedding(embedding, nResults) {
const query = `DECLARE $vector AS List<Float>;
$TargetEmbedding = Knn::ToBinaryStringFloat($vector);
SELECT id, document, embedding, metadata FROM ${this.settings.tableName}
ORDER BY Knn::CosineDistance(embedding, $TargetEmbedding)
LIMIT ${nResults};`;
const params = {
$vector: TypedValues.list(Types.FLOAT,
embedding)
};
const result = await this.driver.tableClient.withSession(async function(session) {
return await session.executeQuery(query, params);
});
return result.resultSets[0].rows.map(function(row) {
return {
id: row.items[0].textValue,
document: row.items[1].textValue,
embedding: row.items[2].textValue,
metadata: JSON.parse(row.items[3].textValue)
};
});
}
// return all documents
async getAllDocuments() {
const query = 'SELECT id, document, embedding, metadata FROM ' + this.settings.tableName;
const result = await this.driver.tableClient.withSession(async function(session) {
return await session.executeQuery(query);
});
return result.resultSets[0].rows.map(function(row) {
return {
id: row.items[0].textValue,
document: row.items[1].textValue,
embedding: JSON.parse(row.items[2].textValue),
metadata: JSON.parse(row.items[3].textValue)
};
});
}
// return statistics
async getStats() {
if (!this.initialized) await this.initialize();
const query = 'SELECT COUNT(*) as count FROM ' + this.settings.tableName;
const result = await this.driver.tableClient.withSession(async function(session) {
return await session.executeQuery(query);
});
const count = result.resultSets[0].rows[0].items[0].uint64Value;
return {
totalDocuments: Number(count),
totalEmbeddings: Number(count),
storage: 'Yandex Database',
table: this.settings.tableName,
database: this.database
};
}
// clear all data
async clear() {
if (!this.initialized) await this.initialize();
const query = 'DELETE FROM ' + this.settings.tableName;
await this.driver.tableClient.withSession(async function(session) {
await session.executeQuery(query);
});
console.log('? YDB Vector Store cleared');
}
// close connection to database
async close() {
if (this.driver) {
await this.driver.destroy();
console.log('? YDB connection closed');
}
}
}
Подводные камни, с которыми столкнулся
Типизация параметров — YDB требует строгой типизации, особенно для List<Float>
Batch-операции — пришлось повозиться с правильным форматом параметров
Ограничения ресурсов — YDB имеет лимиты, нужна грамотная обработка RESOURCE_EXHAUSTED
Что в итоге
Получился рабочий инструмент, который уже используется для семантического поиска по задачам в нашей системе. Подход с YDB оказался удобным, особенно если вы уже используете экосистему Yandex Cloud.
PaulIsh
В статье дается какой-то класс, но не видно как этот инструмент применяется. Нет примеров встраивания.
По самому классу, код не выглядит ни отшлифованным, ни готовым к использованию в промышленной системе.
Несколько примеров.
Использование
console.log
. Для системы с централизованным логированием весь этот вывод должен направляться в класс/интерфейс логгера с учетом уровней логирования.Обработка ошибок. Наверняка часть ошибок не обязана выбрасываться наружу, а может быть обработана на месте, например, через повтор операции (бывает что сеть дала сбой и надо сделать переподключение/переотправку).
Часть последовательных await может быть объединена в Promise.all(). Например, вот эти две строки:
Тут нам ничего не мешает создавать или проверять наличие таблицы пока выполняется какая-то сторонняя обработка.
Вот тут не ясно зачем вам await внутри callback если вы и так промис ждете из него, а обработку ошибок внутри callback не делаете:
У вас в close делается освобождение занятых классом ресурсов. Для этого в typescript завели специальный инструмент.