Привет, Хабр! Меня зовут Алексей, и я тот самый программист, который до недавнего времени скептически относился к ИИ. «Очередная мода», — думал я. Но время не стоит на месте, и сейчас я активно изучаю ИИ как со стороны пользователя, так и с позиции разработчика.

Особенно интересной стала задача интеграции нашей внутренней системы управления задачами с ИИ. Типовое решение — использование векторной базы (RAG) в качестве промежуточного хранилища. Саму задачу я стал решать в режиме Vibe Coding (но об этом стоит написать отдельный пост).

Весной команда Yandex DB анонсировала поддержку векторных операций, а на недавней конференции Yandex Neuro Scale упоминалось, что теперь YDB можно использовать в качестве RAG. Но вот незадача — я нигде не нашел end-to-end примера реализации. Пришлось разбираться самостоятельно.

Что у нас получилось

Реализовал класс на JavaScript, который позволяет:

  • Сохранять документы с векторными представлениями в YDB

  • Выполнять семантический поиск по векторам

  • Работать с batch-операциями для эффективной вставки

  • Для генерации эмбеддингов используется библиотека @xenova/transformers - но можно легко поменять на что-то другое.

Код решения

// ydb-vector-store.js
import pkg from 'ydb-sdk';
const { Driver, TypedValues, Types  } = pkg;
import { pipeline } from '@xenova/transformers';

//
// VectorStore implementation used Yandex DB as a storage
//
export class YDBVectorStore {
  // major options:
  // * endpoint - endpoint to connect to database
  // * database - path to database
  // * authService - class restonsible for authentication
  // Optional options:
  // * tableName
  // * vectorDimensions
  // * batchSize
  //
  constructor(options = {}) {
    this.endpoint = options.endpoint;
    this.database = options.database;
    this.authService = options.authService;

    this.driver = null;
    this.embedder = null;
    this.initialized = false;

    this.settings = {
      tableName: options.tableName || 'vector_db',
      vectorDimensions: options.vectorDimensions || 384,
      batchSize: options.batchSize || 25,
      ...options
    };

    console.log('?️  YDB Vector Store configured:');
    console.log('   - Table:', this.settings.tableName);
    console.log('   - Dimensions:', this.settings.vectorDimensions);
    console.log('   - Endpoint:', this.endpoint);
    console.log('   - Database:', this.database);
  }

  // initialize vector database
  async initialize() {
    if (this.initialized) return;

    console.log('? Initializing YDB Vector Store...');

    try {
      // 1. Initialize YDB Driver
      this.driver = new Driver({
        endpoint: this.endpoint,
        database: this.database,
        authService: this.authService
      });

      if (!await this.driver.ready(10000)) {
        throw new Error('YDB driver failed to initialize within 10 seconds');
      }

      console.log('✅ YDB driver connected successfully');

      // 2. Loading embedding model
      console.log('? Loading embedding model...');
      this.embedder = await pipeline('feature-extraction', 'Xenova/all-MiniLM-L6-v2');

      // 3. Create table (if not exists)
      await this.createTable();

      this.initialized = true;
      console.log('✅ YDB Vector Store initialized');

    } catch (error) {
      console.error('❌ Error initializing YDB Vector Store:', error.message);

      if (this.driver) {
        await this.driver.destroy();
      }
      throw error;
    }
  }

// Cretae table (if not exists)
async createTable() {
  const query = `CREATE TABLE IF NOT EXISTS ${this.settings.tableName} (
    id Serial,
    document Text,
    embedding String,
    metadata Json,
    created_at Timestamp,
    PRIMARY KEY (id)
  )`;

  console.log(`Create Table ${this.settings.tableName}`);

  await this.driver.queryClient.do({
          fn: async (session) => {
            await session.execute({
                          text: query
            });
          },
  });

  console.log(`Table ${this.settings.tableName} created`);
}

// Add documents to the database
async addDocumentsBatch(documents, metadatas = null) {
    if (!this.initialized) await this.initialize();

    console.log('? Adding ' + documents.length + ' documents to YDB...');

    let processed = 0;
    const batchSize = this.settings.batchSize;

    for (let i = 0; i < documents.length; i += batchSize) {
      const batchDocs = documents.slice(i, i + batchSize);
      const batchMetadatas = metadatas ? metadatas.slice(i, i + batchSize) : null;

      await this.addDocumentsBatchInternal(batchDocs, batchMetadatas);

      processed += batchDocs.length;
      console.log('? Progress: ' + processed + '/' + documents.length + ' documents');

      // Add pause between batches
      if (i + batchSize < documents.length) {
        await new Promise(function(resolve) { setTimeout(resolve, 100); });
      }
    }

    console.log('✅ Added ' + documents.length + ' documents to YDB');
  }

// internal implementation of adding documents
async addDocumentsBatchInternal(documents, metadatas = null) {
  if (documents.length === 0) return;

  try {
    // Generate embeddings for all documents
    const embeddings = await Promise.all(
      documents.map(doc => this.generateEmbedding(doc))
    );

    // Prepare values for batch insert
    const values = documents.map((_, index) =>
      `($document${index}, Untag(Knn::ToBinaryStringFloat($embedding${index}), "FloatVector"), $metadata${index}, CurrentUtcTimestamp())`
    ).join(', ');

    // Create DECLARE for all params
    const declarations = documents.flatMap((_, index) => [
      `DECLARE $document${index} AS Text`,
      `DECLARE $embedding${index} AS List<Float>`,
      `DECLARE $metadata${index} AS Json`
    ]).join(';\n');

    const query = `
      ${declarations};

      INSERT INTO ${this.settings.tableName} (document, embedding, metadata, created_at)
      VALUES ${values}
    `;

    // Prepare params for all documents
    const params = {};
    for (let i = 0; i < documents.length; i++) {
      const document = documents[i];
      const metadata = metadatas ? metadatas[i] : {};
      const embedding = embeddings[i];

      params[`$document${i}`] = TypedValues.text(document);
      params[`$embedding${i}`] = TypedValues.list(Types.FLOAT, embedding);
      params[`$metadata${i}`] = TypedValues.json(JSON.stringify(metadata));
    }

    // Execute one batch request
    //console.log("Query: " + query);

    await this.driver.tableClient.withSession(async (session) => {
      await session.executeQuery(query, params);
    });

    console.log(`✅ Successfully inserted ${documents.length} documents in batch`);

  } catch (error) {
    console.error('❌ Error in batch operation:', error.message);
    throw error;
  }
}


  // generate embeddings
  async generateEmbedding(text) {
    if (!this.initialized) await this.initialize();

    const output = await this.embedder(text, {
      pooling: 'mean',
      normalize: true
    });
    return Array.from(output.data);
  }


  // perform search in Database
  async search(query, nResults = 5) {
    if (!this.initialized) await this.initialize();

    console.log('? Searching in YDB for: "' + query.substring(0, 50) + '..."');

    try {
      const queryEmbedding = await this.generateEmbedding(query);

      const topResults = await this.searchByEmbedding(queryEmbedding, nResults);

      console.log('✅ Found ' + topResults.length + ' relevant results');

      return {
        documents: topResults.map(function(r) { return r.document; }),
        metadatas: topResults.map(function(r) { return r.metadata; })
      };

    } catch (error) {
      console.error('❌ Error during search:', error);
      throw error;
    }
  }

  // internal implementation of searching by embedding
  async searchByEmbedding(embedding, nResults) {
    const query = `DECLARE $vector AS List<Float>;
                  $TargetEmbedding = Knn::ToBinaryStringFloat($vector);

                  SELECT id, document, embedding, metadata FROM ${this.settings.tableName}
                  ORDER BY Knn::CosineDistance(embedding, $TargetEmbedding)
                  LIMIT ${nResults};`;

    const params = {
                  $vector: TypedValues.list(Types.FLOAT,
                                            embedding)
                };

    const result = await this.driver.tableClient.withSession(async function(session) {
      return await session.executeQuery(query, params);
    });

    return result.resultSets[0].rows.map(function(row) {
      return {
        id: row.items[0].textValue,
        document: row.items[1].textValue,
        embedding: row.items[2].textValue,
        metadata: JSON.parse(row.items[3].textValue)
      };
    });
  }

  // return all documents
  async getAllDocuments() {
    const query = 'SELECT id, document, embedding, metadata FROM ' + this.settings.tableName;

    const result = await this.driver.tableClient.withSession(async function(session) {
      return await session.executeQuery(query);
    });

    return result.resultSets[0].rows.map(function(row) {
      return {
        id: row.items[0].textValue,
        document: row.items[1].textValue,
        embedding: JSON.parse(row.items[2].textValue),
        metadata: JSON.parse(row.items[3].textValue)
      };
    });
  }

  // return statistics
  async getStats() {
    if (!this.initialized) await this.initialize();

    const query = 'SELECT COUNT(*) as count FROM ' + this.settings.tableName;

    const result = await this.driver.tableClient.withSession(async function(session) {
      return await session.executeQuery(query);
    });

    const count = result.resultSets[0].rows[0].items[0].uint64Value;

    return {
      totalDocuments: Number(count),
      totalEmbeddings: Number(count),
      storage: 'Yandex Database',
      table: this.settings.tableName,
      database: this.database
    };
  }

  // clear all data
  async clear() {
    if (!this.initialized) await this.initialize();

    const query = 'DELETE FROM ' + this.settings.tableName;

    await this.driver.tableClient.withSession(async function(session) {
      await session.executeQuery(query);
    });

    console.log('? YDB Vector Store cleared');
  }

  // close connection to database
  async close() {
    if (this.driver) {
      await this.driver.destroy();
      console.log('? YDB connection closed');
    }
  }
}

Подводные камни, с которыми столкнулся

  1. Типизация параметров — YDB требует строгой типизации, особенно для List<Float>

  2. Batch-операции — пришлось повозиться с правильным форматом параметров

  3. Ограничения ресурсов — YDB имеет лимиты, нужна грамотная обработка RESOURCE_EXHAUSTED

Что в итоге

Получился рабочий инструмент, который уже используется для семантического поиска по задачам в нашей системе. Подход с YDB оказался удобным, особенно если вы уже используете экосистему Yandex Cloud.

Комментарии (1)


  1. PaulIsh
    10.10.2025 02:04

    В статье дается какой-то класс, но не видно как этот инструмент применяется. Нет примеров встраивания.

    По самому классу, код не выглядит ни отшлифованным, ни готовым к использованию в промышленной системе.

    Несколько примеров.

    Использование console.log. Для системы с централизованным логированием весь этот вывод должен направляться в класс/интерфейс логгера с учетом уровней логирования.

    Обработка ошибок. Наверняка часть ошибок не обязана выбрасываться наружу, а может быть обработана на месте, например, через повтор операции (бывает что сеть дала сбой и надо сделать переподключение/переотправку).

    Часть последовательных await может быть объединена в Promise.all(). Например, вот эти две строки:

    this.embedder = await pipeline('feature-extraction', 'Xenova/all-MiniLM-L6-v2');
    await this.createTable();

    Тут нам ничего не мешает создавать или проверять наличие таблицы пока выполняется какая-то сторонняя обработка.

    Вот тут не ясно зачем вам await внутри callback если вы и так промис ждете из него, а обработку ошибок внутри callback не делаете:

       const result = await this.driver.tableClient.withSession(async function(session) {
          return await session.executeQuery(query);
        });

    У вас в close делается освобождение занятых классом ресурсов. Для этого в typescript завели специальный инструмент.