Привет Хабр! Меня зовут Владимир и сегодня я буду развивать тему фишечки VectorChord про которую упомянул в предыдущей статье.

В данном материале я покажу, как поднять инфраструктуру с VectorChord, настроить VechordRegistry, написать пайплайны работы с БД, организовать гибридный поиск и добавить простейший реранкинг.

Поехали.

Настройка инфраструктуры

Начнем с настройки базы данных. Напишем docker-compose-dev.yml для развёртывания контейнера с базой данных

services:
  postgres:
    image: tensorchord/vchord-suite:pg18-latest
    environment:
      POSTGRES_DB: ${DB__NAME}
      POSTGRES_USER: ${DB__USER}
      POSTGRES_PASSWORD: ${DB__PASSWORD}
    volumes:
      - pgdata:/var/lib/postgresql
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${DB__USER} -d ${DB__NAME}"]
      interval: 5s
      timeout: 5s
      retries: 5
volumes:
  pgdata:

Проверим работоспособность. Поднимем контейнер с базой данных и увидим девственно чистую базу данных:

Как можно увидеть, мы не устанавливали в нашу базу данных никаких расширений. Это не склероз, а сделано специально, позже покажу почему.

Перед написанием основного кода скопируем из предыдущего проекта на pgvector настройки сервиса с одной доработкой - добавим к настройкам базы данных поле namespace. Оно пригодится в дальнейшем для таблиц:

class PGConfig(BaseModel):
    # предыдущий код
    namespace: str

Можно переходить к написанию кода. И первым на очереди у нас таблица данных для PostgreSQL.

Таблицы данных

Документы будем хранить в двух таблицах - одна для документа целиком, вторая - для чанков. В таблице с документами будем хранить исходные тексты. Это единица управления контентом. Таблица чанков - основа поисковой системы (документы слишком велики для поиска, поэтому их необходимо разбивать на части). Но начнём с третьей, базовой таблицы - в ней будут поля для фиксации времени создания/обновления записи и общее поле с метаданными.

# файл db_models/base.py
from datetime import datetime, timezone
from functools import partial
import msgspec
from psycopg.types.json import Jsonb
from vechord import Table

class BaseTable(Table, kw_only=True):
    metadata: Jsonb
    created_at: datetime = msgspec.field(
        default_factory=partial(datetime.now, timezone.utc))
    updated_at: datetime = msgspec.field(
        default_factory=partial(datetime.now, timezone.utc))

Таблица наследуется от базового класса таблиц vechord.Table, которая, в свою очередь, наследуется от msgspec.Struct.

Краткая справка: msgspec - это как Pydantic, только без огромной экосистемы, зато в несколько раз быстрее.

Поля created_at / updated_at формируются автоматически при создании записи. Т.к. способа автоматически обновлять значение поля updated_at (как в sqlalchemy) я не нашел, поэтому будем делать вручную в коде метода обновления записи.

Официальный User Guide рекомендует отдельно определить DenseVector = Vector[emd_dim], не будем пренебрегать советом:

from core import settings

DenseVector = Vector[settings.db.embedding_dim]

Теперь таблица для документов:

# файл db_models/document.py
import msgspec
from vechord.spec import PrimaryKeyUUID
from .base import BaseTable

class Document(BaseTable, kw_only=True):
    uid: PrimaryKeyUUID = msgspec.field(default_factory=PrimaryKeyUUID.factory)
    title: str
    text: str

Для первичного ключа (если не хотим прописывать его каждый раз вручную) у vechord есть два базовых метода - vechord.spec.PrimaryKeyAutoIncrease и vechord.spec.PrimaryKeyUUID. Первый генерирует автоинкрементируемое поле целого типа, второй - UUID. Мне привычнее UUID, поэтому использую его. Ну и последняя таблица:

# файл db_models/doc_chunk.py
from vechord.spec import ForeignKey, Keyword, PrimaryKeyUUID, Vector

class Chunk(BaseTable, kw_only=True):
    uid: PrimaryKeyUUID = msgspec.field(default_factory=PrimaryKeyUUID.factory)
    doc_id: Annotated[UUID, ForeignKey[Document.uid]]
    content: str
    content_tsv: Keyword
    embedding: DenseVector
    chunk_index: int

Поле content_tsv с типом vechord.spec.Keyword будет содержать данные для полнотекстового поиска, поле embedding - векторное представление текста из поля content. А у поля с типом ForeignKey есть одна особенность - он автоматически реализует ON DELETE CASCADE. Может быть у кого-то возникнет вопрос - “А почему поле uid не вынести в базовый класс?”. Попробуйте, у меня не вышло. Может быть я не очень разобрался в msgspec, но когда я размещал поле uid в базовой таблице, то ForeignKey[Document.uid] искал uid не в таблице Document, а в таблице BaseTable. Поэтому пришлось поле писать два раза.

Перейдём к способу создания подключения к БД и таблиц - классу VechordRegistry. Это прям швейцарский нож для базы данных. В классе есть клиент с AsyncConnectionPool, инструменты для создания таблиц, инструменты для реализации CRUD, инструменты разных видов поиска. Короче, всё что нужно в одном классе. Ну и использовать его можно как привычный асинхронный контекстный менеджер или замутить pipeline с помощью декоратора @vr.inject. Создадим этот замечательный объект в файле __init__.py

from vechord import VechordRegistry
from core import settings
from .document import Document
from .doc_chunk import Chunk

vr = VechordRegistry(
    namespace=settings.db.namespace,
    url=settings.db.db_url,
    tables=[Document, Chunk]
)

В качестве параметров в конструктор передаются namespace (используется как префикс в названиях таблиц, которые создаст наш Регистратор), url для доступа к базе данных и список необходимых таблиц. Ну а теперь к вопросу, почему я не делал инициализатор. Если покопаться в коде VechordRegistry, то можно найти вот такие интересные куски:

    async def init_extension(self):
        async with (
            await AsyncConnection.connect(self.url) as conn,
            conn.cursor() as cursor,
        ):
            await cursor.execute("CREATE EXTENSION IF NOT EXISTS vchord CASCADE")
            await cursor.execute("CREATE EXTENSION IF NOT EXISTS vchord_bm25")
            await cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_tokenizer")
            await cursor.execute(
                'SET search_path TO "$user", public, bm25_catalog, tokenizer_catalog'
            )

    async def __aenter__(self):
        await self.init_extension()

Таким образом, при первом вызове async with vr: он сам установит все необходимые расширения. Ну а если продолжить изучать код, то найдём и методы для создания таблиц, и создание базовых токенизаторов (необходимы для полнотекстового поиска). Проверим как это работает. Напишем простенький main.py файл:

import asyncio
import sys
from psycopg.types.json import Jsonb
from db_models import Document, vr

async def main():
    async with vr:
        doc = Document(title='Note', text='Some text', metadata=Jsonb({}))
        await vr.insert(doc)
        docs = await vr.select_by(Document.partial_init(), limit=1)
        print(docs)

if __name__ == '__main__':
    if sys.platform == 'win32':
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(main())

В методе мы создадим документ и сразу попытаемся его получить. Для записи / чтения базы используются собственные методы VechordRegistry. Для получения записей в метод VechordRegistry.select_by необходимо передать объект таблицы, который должен быть создан с помощью Table.partial_init(). Указанные в конструкторе значения будут использоваться для фильтрации. Посмотрим что с самой базой:

Как видим, VechordRegistry установил необходимые расширения, создал токенизаторы, таблицы и успешно добавил наш документ в БД. Перейдём к разработке сервиса-надстройки для VechordRegistry.

Сервис работы с документами

Хоть VechordRegistry сам по себе является абстракцией над голой базой данных, нам необходимо реализовать дополнительный слой абстракции. На нём, как минимум, необходимо реализовать создание и изменение чанков при соответствующих действиях с документом и метод гибридного поиска.

Начнём с создания документа. При вызове метода создания нам необходимо будет записать документ в базу данных, затем нарезать его на чанки и также сохранить из в базе. Для этой задачи идеально подойдёт vechord.registry.VechordPipeline. Как это работает: мы создаём несколько функций, выполняющих обработку данных. Каждую функцию необходимо продекорировать с помощью @vr.inject указав в качестве параметра входную (для извлечения данных из БД), выходную (для сохранения данных в БД) или обе таблицы. Далее мы создаём объект класса VechordPipeline, в который передадим список наших функций. Первая функция будет использоваться для получения входных данных, последняя - для возврата выходных. Остальные функции будут использоваться для промежуточной обработки данных. Функции будут выполняться в порядке их следования в списке, формируя конвейер обработки.

Напишем пайплайн создания документа. В нём будет два этапа - создание документа и создание чанков текста документа. Но начнем пайплайн с входной Pydantic схемы, которую в дальнейшем будем использовать в API:

# файл schemas/document.py
from typing import Annotated, Any
from pydantic import BaseModel, Field

class DocumentBase(BaseModel):
    title: Annotated[str, Field(..., description='Заголовок документа')]
    text: Annotated[str, Field(..., description='Содержимое документа')]
    meta_data: Annotated[
        dict[str, Any],
        Field(default_factory=dict, description='Метаданные документа')]

class DocumentCreate(DocumentBase):
    pass

Как и раньше, делаем базовую модель, от неё наследуем модель создания. Теперь перейдём к реализации шагов пайплайна. Из-за особенностей работы декоратора их нельзя поместить в тело класса - self будет считаться параметром, необходимым к получению из базы данных, а не ссылкой на объект класса. Поэтому вынесем шаги в отдельный файл. Начнём с create_doc:

# файл services/utils.py
from psycopg.types.json import Jsonb
from db_models import Document, vr
from schemas.document import DocumentCreate

@vr.inject(output=Document)
async def _create_document(doc_data: DocumentCreate) -> Document:
    doc = Document(
        title=doc_data.title,
        text=doc_data.text,
        metadata=Jsonb(doc_data.metadata)
    )
    return doc

Декоратор @vr.inject с параметром output=Document автоматически сохранит созданый объект в БД. Теперь метод для создания чанков. С ним немного посложнее, т.к. для него нам необходимы инструменты для нарезания текста и получения эмбеддингов. Пока просто используем заглушки:

# файл services/utils.py
@vr.inject(input=Document, output=Chunk)
async def create_chunks(uid: UUID, text: str) -> list[Chunk]:
    chunks = await _chunker.segment(text)
    return [
        Chunk(
            doc_id=uid,
            content=chunk,
            content_tsv=Keyword(chunk),
            embedding=DenseVector(await _embedder.vectorize_chunk(chunk)),
            metadata=Jsonb({}),
            chunk_index=i
        )
        for i, chunk in enumerate(chunks, start=1)
    ]

В параметры декоратора @vr.inject передаём, что вход у нас это таблица Document, а выход - Chunk. В параметрах указываем именно класс таблицы, а не что возвращает метод, поэтому без list[]. Во входных параметрах функции необходимо указать имена полей таблицы Document которые хотим видеть в методе. Соберём пайплайн:

# файл services/document.py
from db_models import vr
from .utils import create_chunks, create_document

class DocumentService:
    async def create_document(self, doc_data: DocumentCreate):
        pipeline = vr.create_pipeline([self._create_document, self._create_chunks])
        await pipeline.run(doc_data)

Готовенько. Время протестить. Но сначала определимся с чанкером и эмбеддером.

Для получения чанков в библиотеке vechord.chunk имеется три класса: RegexChunker, SpacyChunker и GeminiChunker. RegexChunker просто разбивает по определённому набору символов через регулярку, GeminiChunker требует API-ключ. Остаётся SpacyChunker на базе библиотеки spacy. Чтобы SpacyChunker заработал, надо доустановить библиотеку spacy и загрузить модель для чанкирования. Для русского языка доступно три модели: ru_core_news_sm, ru_core_news_md и ru_core_news_lg. Отличаются размером, точностью и скоростью работы. Возьмем среднюю:

uv add spacy==3.8.13

python -m spacy download ru_core_news_md

Если не получается, можно скачать модель с гитхаба и установить напрямую

uv add <путь/к/файлу>

С эмбеддингами всё немного посложнее. В vechord.embedding есть эмбеддеры от Gemini, Jina, Voyage и даже OpenAI. Но у всех одна проблема - им нужен API-ключ. Есть класс SpacyDenseEmbedding, но тут даже сами разработчики предупреждают, что это не серьёзно. Но, т.к. весь этот материал - скорее эксперимент, чем production ready продукт, воспользуемся именно SpacyDenseEmbedding. Создадим экземпляры:

from vechord.chunk import SpacyChunker
from vechord.embedding import SpacyDenseEmbedding

@lru_cache
def get_chunker() -> SpacyChunker:
    ch = SpacyChunker(model='ru_core_news_md')
    return ch

@lru_cache
def get_embedder() -> SpacyDenseEmbedding:
    emb = SpacyDenseEmbedding(model='ru_core_news_md')
    return emb

_chunker = get_chunker()
_embedder = get_embedder()

Проверим работоспособность пайплайна. Перепишем наш main.py:

from services import DocumentService
from schemas.document import DocumentCreate


async def main():
    async with vr:
        await DocumentService.create_document(
            DocumentCreate(title='Тестовый документ', text="""Длинный текст документа"""))

Смотрим результат

Отлично. Документ загрузился, поделился на чанки, для каждого чанка получен вектор и какие-то ключевые слова. Пайплайн работает. Однако в текущем виде он только пишет в БД. Хорошим тоном является возвращать созданный объект (ну или как минимум его первичный ключ). Реализуем. И как всегда начинаем со схемы ответа:

# файл schemas/document.py
class DocumentResponse(DocumentBase):
    uid: Annotated[UUID, Field(..., description='Идентификатор документа')]
    created_at: Annotated[
        datetime, Field(..., description='Дата и время создания документа')]
    updated_at: Annotated[
        datetime,
        Field(..., description='Дата и время последнего обновления документа')]
    model_config = ConfigDict(from_attributes=True)

Схема есть. Теперь напишем финальный шаг для пайплайна создания:

# файл services/utils.py
@vr.inject(input=Document)
async def get_document(uid: UUID) -> DocumentResponse | None:
    docs = await vr.select_by(Document.partial_init(uid=uid))
    return DocumentResponse.model_validate(docs[0]) if docs else None

Не понял, баг это или фича, но сразу получить объект класса Document нельзя - @vr.inject позволяет читать только конкретные поля таблицы, переданной в параметре input. Поэтому берём uid, вручную читаем объект с БД и возвращаем его (либо None, если что-то пошло не так).

Теперь обновление. В документации и коде явного update метода я не нашел. Поэтому лютый колхоз. Пишем update модель

# файл schemas/document.py
class DocumentUpdate(BaseModel):
    title: Annotated[str | None, Field(None, description='Заголовок документа')]
    text: Annotated[str | None, Field(None, description='Содержимое документа')]
    metadata: Annotated[
        dict[str, Any],
        Field(default_factory=dict, description='Метаданные документа')]

    def is_empty(self) -> bool:
        return all([
            self.title is None,
            self.text is None,
            not self.metadata
        ])

Это та же модель создания документа, но все поля опциональны. Метод is_empty добавлен, чтобы проще валидировать наличие данных в модели. Теперь реализуем саму функцию update:

# файл services/utils.py
@vr.inject(output=Document)
async def update_document(uid: UUID, doc_data: DocumentUpdate) -> Document:
    docs = await vr.select_by(Document.partial_init(uid=uid))
    if not docs:
        raise ValueError(f'Document {uid} not found')
    doc = docs[0]
    new_doc = Document(
        title=doc_data.title if doc_data.title is not None else doc.title,
        text=doc_data.text if doc_data.text is not None else doc.text,
        metadata=Jsonb(doc_data.metadata) if doc_data.metadata is not None else doc.metadata,
        updated_at=datetime.now(timezone.utc),
        created_at=doc.created_at,
        uid=doc.uid,
    )
    await vr.remove_by(Document.partial_init(uid=uid))
    return new_doc

При обновлении мы должны сохранить ID и дату создания. Поэтому, получаем документ, создаём новый документ с частями старого (время создания и uid) и удаляем старый. При удалении старого документа вместе с ним удалятся и его чанки (из-за ON DELETE CASCADE при объявлении поля vechord.spec.ForeignKey). Далее новый документ записывается в базу данных. Теперь пайплайн

class DocumentService:
    # предыдущий код
    @staticmethod
    async def update_document(uid: UUID, doc_data: DocumentUpdate) -> DocumentResponse | None:
        pipeline = vr.create_pipeline([update_document, create_chunks, get_document])
        return await pipeline.run(uid=uid, doc_data=doc_data)

Дополним наш main.py строчкой:

# Где-то после создания документа
await DocumentService.update_document(
    uid=UUID(<UUID созданного документа>), doc_data=DocumentUpdate(title='Изменённый текстовый документ')
)

Проверим:

Работает - время создания документа старое, а время обновления и чанки - новые. Добиваем наш сервис методами получения/удаления в виде простой прослойки и переходим к поиску

class DocumentService:
    # предыдущий код
    @staticmethod
    async def get_document(uid: UUID) -> DocumentResponse | None:
        docs = await vr.select_by(Document.partial_init(uid=uid))
        return DocumentResponse.model_validate(docs[0]) if docs else None
    @staticmethod
    async def list_documents(limit: int | None = None) -> list[DocumentResponse]:
        docs = await vr.select_by(Document.partial_init(), limit=limit)
        return [DocumentResponse.model_validate(doc) for doc in docs]
    @staticmethod
    async def delete_document(uid: UUID) -> None:
        await vr.remove_by(Document.partial_init(uid=uid))

Сервис реализации поиска

По традиции, начнем реализацию поиска со схем:

# файл schemas/search.py
class SearchBase(BaseModel):
    query: Annotated[str, Field(..., description='Поисковый запрос')]
    topk : Annotated[
        int, Field(10, ge=1, description='Количество результатов для возврата')]
class VectorSearchRequest(SearchBase):
    probe: Annotated[
        int | None, Field(None, description='Сколько кластеров K-means нужно проверить')]
class KeywordSearchRequest(SearchBase):
    pass

В схемах поисковых запросов просто повторяем параметры соответствующих методов VechordRegistry, за одним исключением - векторизовать запросы мы будем уже в нашем сервисе, тем же эмбеддером, которым получали векторы чанков. Поэтому поле запроса - строка.

Теперь схема возврата результата:

# файл schemas/search.py
class ChunkResponse(BaseModel):
    uid: Annotated[UUID, Field(..., description='Идентификатор чанка')]
    doc_id: Annotated[
        UUID, Field(..., description='Идентификатор документа')]
    content: Annotated[str, Field(..., description='Содержимое чанка')]
    created_at: Annotated[
        datetime, Field(..., description='Дата и время создания чанка')]
    updated_at: Annotated[
        datetime,
        Field(..., description='Дата и время последнего обновления чанка')]
    model_config = ConfigDict(from_attributes=True)

Теперь реализуем сервис поиска. Начнем реализацию с базовых методов поиска - векторного и полнотекстового:

# файл services/search.py
class SearchService:
    @staticmethod
    async def vector_search(request: VectorSearchRequest) -> list[ChunkResponse]:
        vector = await _embedder.vectorize_chunk(request.query)
        results = await vr.search_by_vector(
            Chunk, vec=vector, topk=request.topk, probe=request.probe)
        return [ChunkResponse.model_validate(r) for r in results]
    @staticmethod
    async def keyword_search(query: KeywordSearchRequest) -> list[ChunkResponse]:
        results = await vr.search_by_keyword(Chunk, keyword=query.keyword, topk=query.topk)
        return [ChunkResponse.model_validate(r) for r in results]

Для векторного поиска сначала получаем вектор запроса. Ну а в остальном, методы просто передают параметры в методы класса VechordRegistry.

Ну и самое интересное - гибридный поиск

Гибридный поиск

Гибридный поиск обеспечивает высокую полноту выдачи (recall), объединяя семантический и ключевой поиск. Это позволяет находить документы как по смыслу, так и по точным совпадениям слов, компенсируя слабые стороны каждого метода в отдельности. Однако, методы первого этапа часто жертвуют точностью ради скорости. Здесь на помощь приходит реранкинг: финальный отбор лучших. Модель оценивает контекстное соответствие запроса и документа, исправляя ошибки первого этапа. Таким образом система сочетает скорость поиска по большим данным с точностью понимания смысла, выводя на первые позиции только то, что действительно нужно пользователю.

Библиотека vechord предоставляет свои реранкеры. Но, как и в случае с эмбеддерами, CohereReranker и JinaReranker требуют API ключи. Остаётся простейший ReciprocalRankFusion, названый так в честь используемого алгоритма. Название можно перевести как “Объединение обратных рангов”, и оно фактически описывает алгоритм - итоговый ранг документа формируется путём суммирования обратных величин его позиций в результатах поиска с добавлением константы для сглаживания влияния крайних значений. В итоге, чем выше документ в отдельных списках и чем чаще он встречается на верхних позициях, тем выше его итоговый ранг:

\text{score}(d) = \sum_{i=1}^{n} \frac{1}{k + \text{rank}_i(d)}

где \text{score}(d) - итоговый балл документа d, n - количество списков результатов, \text{rank}_i(d) - позиция документа d в i-м списке, k - константа (обычно 60).

Перейдём от теории к практике. Для начала реализуем модель запроса

class HybridSearchRequest(VectorSearchRequest, KeywordSearchRequest):
    boost: Annotated[
        int, Field(3, ge=1, description='Коэффициент расширения выборки')]

Параметр boost нужен для повышения итоговой релевантности, т.к. если передать реранкеру ровно topk документов, то ему не из чего выбирать - он просто переставит местами уже отобранные документы, не улучшив качество выборки. Далее сам метод

class SearchService:
    # предыдущий код
    @staticmethod
    async def hybrid_search_fuse(request: HybridSearchRequest) -> list[ChunkResponse]:
        rrf = ReciprocalRankFusion()
        vector = await _embedder.vectorize_chunk(request.query)
        return rrf.fuse(
            [
                await vr.search_by_vector(
                    Chunk, vec=vector, topk=request.topk * request.boost, probe=request.probe),
                await vr.search_by_keyword(Chunk, keyword=request.keyword, topk=request.topk * request.boost)
            ]
        )[:request.topk]

С поиском как-бы всё.

API

API расписывать подробно не буду, т.к. фактически его копируем из репозитория (который git) для первой статьи и заменяем вызов методов репозитория (который паттерн) на вызов соответствующих методов сервиса. Отметить стоит, что из-за особенности VechordPipeline последний метод пайплайна возвращает список значений, поэтому в API для создания документа мы используем:

async def create_document(data: DocumentCreate):
    res = await DocumentService.create_document(data)
    return res[0]

Остался main.py. Он стандартный для FastAPI приложения, прокомментирую только метод lifespan:

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with vr:
        yield

В lifespan мы открываем пул соединений нашего VechordRegistry, который будет работать, пока работает приложение.

Внезапное окончание

Что имеем на текущий момент:

  • асинхронная работа с PostgreSQL через VechordRegistry

  • автоматическое разбиение документов на чанки (SpacyChunker) и получение эмбеддингов (SpacyDenseEmbedding)

  • полнотекстовый, векторный и гибридный поиск с реранкингом (ReciprocalRankFusion)

  • базовый API для документов с каскадным обновлением чанков

В текущей реализации эмбеддер и чанкер — игрушечные (из библиотеки spacy). Для боевого использования стоит заменить их на более серьёзные модели. В следующей части как раз этим и займемся - реализуем наследование от базовых классов, используя различные варианты.

Код проекта доступен тут.

Продолжение тут

Комментарии (0)