«Самый ценный ресурс, который у нас есть, — это время». — Стив Джобс

Введение

В процессе освоения искусственного интеллекта и автоматизации каждый шаг представляет собой возможность погрузиться в мир новых знаний и навыков.

Бот разработан для платформы Telegram, которая не только поддерживает создание чат-ботов, но и обладает широкими возможностями для обмена графическим контентом и данными, включая голосовые сообщения. Этот опыт может быть полезен для создания чат-бота, который предоставит возможность продемонстрировать заказчику работу нейронной сети. Например нейросеть, для распознавания породы кошек по их мурлыканью.

Для создания бота был использован фреймворк aiogram, который предоставляет все необходимые инструменты для взаимодействия с Telegram API. Кроме того, использовалась Large Language Model (LLM) от Сбера. Благодаря этим инструментам бот способен кратко пересказывать содержимое видео и отвечать на вопросы пользователя. Я пробовал использовать YandexGPT, но GigaChat все же показал себя лучше, и мой выбор был сделан. Обе платформы предоставляют бесплатный период использования. Однако, есть и минус в GigaChat - за его Embeddings придется платить.

RAG (Retrieval-Augmented Generation) представляет собой метод обработки естественного языка (NLP), который объединяет два важных процесса: поиск информации и генерацию текста. Прежде всего, для работы RAG необходимо создать векторное хранилище, где каждому текстовому фрагменту соответствует вектор в многомерном пространстве. Это обеспечивает быстрый и эффективный поиск наиболее подходящих фрагментов текста для заданного запроса. Для получения вектора текстового фрагмента используется метод векторных представлений (Embeddings). Модуль langchain предоставляет интерфейс для работы с различными векторными представлениями, такими как OpenAIEmbeddings, GigaChatEmbeddings, YandexGPTEmbeddings, HuggingFaceEmbeddings и другими. Также langchain предоставляет интерфейс для работы с векторными хранилищами, такими как Chroma, Faiss и другими.

Сначала алгоритм на основе запроса пользователя ищет наиболее релевантные фрагменты текста в векторном хранилище. После этого происходит этап генерации, где модель использует найденную информацию для создания текстового ответа. Для этого langchain содержит интерфейс к множеству чат-моделей, таких как ChatOpenAI, ChatYandexGPT, GigaChat и других.

Таким образом, RAG позволяет создавать информативные и грамматически правильные ответы, используя как найденную информацию, так и возможности генерации текста модели.

Внедрение приложений является ключевым этапом для обеспечения их доступности и масштабируемости, и Docker Compose выступает как удобный инструмент в этой области. Docker используется для удобного развертывания приложения в контейнерах. Это обеспечивает изоляцию, легкость масштабирования и упрощает процесс управления зависимостями и окружением.

Реализация проекта

Проект состоит из двух контейнеров: один для Redis, который используется для хранения текстовых данных, а второй для Python, в котором функционирует Telegram бот.

Файловая структура проекта:
  • redis:

    • Директория Redis контейнера содержит файл конфигурации redis.conf

  • youtubot:

    • Директория Python контейнера содержит файлы Telegram бота:

      • Dockerfile: Файл, необходимый для сборки Docker-образа Python.

      • requirements.txt: Файл, содержащий зависимости Python проекта.

      • tests/: Директория, в которой расположены модульные тесты для проверки функциональности кода.

      • bot/: Основная директория бота, содержащая следующие файлы и поддиректории:

        • handlers/: Директория с обработчиками запросов пользователей.

        • llm.py: Файл, в котором реализованы методы для работы с Large Language Models.

      • config.py: Файл с настройками проекта.

      • main.py: Основной файл, запускающий бота.

  • docker-compose.yml:

    • Файл конфигурации Docker Compose, находится в корневой директории проекта. Этот файл определяет конфигурацию проекта и обеспечивает удобный запуск и управление контейнерами Redis и Python.

main.py содержит логику подключения к Redis, инициализацию бота, настройку обработчиков сообщений, запуск поллинга для получения сообщений от пользователей и удаление неподтвержденных сообщений. Для подключения к Redis в асинхронном режиме нужно использовать подмодуль redis.asyncio.

main.py
import os
import asyncio
import logging
import redis.asyncio as redis
import aiogram as ag
import config as cf
from bot.handlers import users

# Настройка логирования
logger = logging.getLogger(__name__)

async def main():
    # Установка уровня логирования в зависимости от переменной окружения DEBUG
    if os.getenv('DEBUG'):
        logging.basicConfig(level=logging.DEBUG, format=cf.logging_format)

    # Подключение к Redis
    db = redis.Redis(
        host=os.getenv('REDIS_HOST', 'localhost'),
        port=os.environ['REDIS_PORT'],
        password=os.getenv('REDIS_PASSWORD'),
        decode_responses=True  # Автоматическое преобразование ответов из байтов в строки
    )
    logger.info('Redis connected.')

    try:
        # Инициализация бота и диспетчера
        bot = ag.Bot(token=os.environ['TELEGRAM_BOT_TOKEN'])
        dp = ag.Dispatcher(bot=bot, storage=ag.fsm.storage.memory.MemoryStorage())

        # Настройка обработчиков
        await users.setup(bot, dp, db)
        logger.info('users.setup is done.')

        # Удаление webhook и запуск поллинга
        await bot.delete_webhook(drop_pending_updates=True)
        logger.info('Starting polling...')
        await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types())
    finally:
        # Закрытие соединения с Redis
        await db.aclose()

if __name__ == '__main__':
    # Загрузка переменных окружения из файла .env и запуск основной функции
    import dotenv
    dotenv.load_dotenv()
    asyncio.run(main())

users.py отвечает за обработку сообщений в боте. В нем создается роутер (router), который отвечает за фильтрацию и распределение сообщений по обработчикам. Определены два обработчика сообщений. Первый обработчик start_handler отвечает на команду "/start", второй обработчик message_handler для остальных сообщений. Для фильтрации сообщений, в aiogram удобно пользоваться «магическим F фильтром».

router.message.filter( ag.F.chat.type == 'private' )

разрешает все сообщения у которых переменная chat.type == "private". А декоратор

@router.message(ag.F.text, ag.filters.Command('start'))

разрешает все сообщения у которых не пустая переменная text и имеет вид команды "start".

users.py
import asyncio
import aiogram as ag
from typing import Any, Callable, Dict, Awaitable
from aiogram.types import Message, TelegramObject
from aiogram.utils.chat_action import ChatActionSender
from .. import llm

# Создание роутера для обработки сообщений
router = ag.Router()
router.message.filter(ag.F.chat.type == 'private')  # Фильтр для приватных чатов

# Обработчик команды '/start'
@router.message(ag.F.text, ag.filters.Command("start"))
async def start_handler(msg: Message):
    await msg.answer("Здравствуйте! Я могу посмотреть любой ролик из YouTube и рассказать вам о нем. "
                     "Пришлите мне ссылку на ролик и задавайте любые вопросы.")

# Обработчик всех прочих сообщений
@router.message(ag.F.text)
async def message_handler(msg: Message, llm_answer):
    # Передача пользовательского запроса в отдельную задачу
    await llm_answer.user_query(msg)

# Middleware для пользовательских запросов
class UsersMiddleware(ag.BaseMiddleware):
    def __init__(self, llm_answer):
        super().__init__()
        self.llm_answer = llm_answer
    
    async def __call__(
        self,
        handler: Callable[[ag.types.TelegramObject, Dict[str, Any]], Awaitable[Any]],
        event: ag.types.TelegramObject,
        data: Dict[str, Any],
    ) -> Any:
        # Передача ссылки на задачу обработки запросов в каждый пользовательский запрос
        data['llm_answer'] = self.llm_answer
        # Создание статуса "...печатает"
        async with ChatActionSender(bot=data['bot'], chat_id=event.chat.id, action='typing'):
            # Вызов соответствующего обработчика сообщений
            return await handler(event, data)

# Функция настройки обработчиков и маршрутизатора
async def setup(bot, dispatcher, db):
    router.message.middleware(UsersMiddleware(llm.UsersLLMAnswer(db)))
    dispatcher.include_routers(router)

Также users.py содержит класс UsersMiddleware, в котором определен метод __call__, который обрабатывает каждое сообщение, отправленное пользователем, и передает его в соответствующий обработчик. Класс ChatActionSender используется, чтобы показать статус "...печатает" в мессенджере во время обработки сообщений. В методе __call__ класса UsersMiddleware переменная data является словарем и может использоваться для передачи объектов в обработчики. Так, через переменную llm_answer в обработчик передается объект UsersLLMAnswer, содержащий логику работы с RAG. Класс UsersLLMAnswer представлен в файле llm.py. Рассмотрим его детально.

user_query
    async def user_query(self, msg):

        # получить ссылку (если она есть) на ютуб видео из запроса
        query, ids = self.get_video_id(msg.text)
        #~ print(query, ids)
        if ids:

            # если запрос содержит ссылки на видео
            if len(ids) > 1:
                await self.a_post(msg, 'Я запомнил видео по последней ссылке, потому что '
                                                        'могу просмотреть только одно видео за раз.')
            else:
                await self.a_post(msg, 'Я запомнил это видео.')

            # получить контекст пользователя по id
            user = self.user_info.setdefault(msg.from_user.id, {})

            # получить ссылку на faiss db и объект langchain
            user['chain'], faiss = await self.create_chain(ids[-1])
            if not query:
                # если в запросе была только ссылка
                # сумаризовать весь ролик
                await self.summarize(msg, ids[-1], faiss)
                await self.a_post(msg, 'Я могу рассказать еще что-нибудь по этому видео.')
                return

        if (user := self.user_info.get(msg.from_user.id)) is None:
            # если контекст пользователя еще не создан
            await self.a_post(msg, 'Я не знаю к какому видео относится запрос.\n'
                                                    'Пришлите пожалуйста ссылку на видео из YouTube.')
        else:
            await self.a_query_answer(msg, user['chain'], query)

При обработке запроса пользователя в методе get_video_id, сначала используется "регулярные выражения", чтобы найти идентификатор YouTube видео. Если идентификатор найден, создается векторное хранилище FAISS для этого видео. Если запрос содержит только ссылку на видео, делается краткий пересказ его содержания. Если в запросе есть текст, то вместо краткого пересказа используется метод RAG для генерации ответа. Для этого формируется RAG цепочка в методе create_chain.

create_chain
    async def create_chain(self, id):

        faiss = await self.create_faiss(id)
        faiss_cell = faiss.as_retriever(k=cf.chunks_by_query_from_faiss) | self.joiner
        retrieval_pass = RunnableParallel( {
            "context": faiss_cell,
            "query": RunnablePassthrough() })
        chain = retrieval_pass | self.prompt | self.llm | StrOutputParser()
        return chain, faiss


    def joiner(self, docs):
        """ Объеденить список чанков из векторного хранилища """
        sorted_docs = list(sorted(docs, key=lambda x: x.metadata['start']))
        return ' '.join(d.page_content for d in sorted_docs)


    async def create_faiss(self, video_id):

        redis_key = f'faiss_db/{video_id}'
        faiss_db = await self.redis.get(redis_key)
        if faiss_db:
            # Загрузить векторное хранилище из файла 
            return FAISS.deserialize_from_bytes( embeddings=self.embeddinger, serialized=faiss_db)
            
        # Загрузить субтитры из видео 
        transcript, language_code = self.load_youtube_transcript(video_id)

        # Создать список документов из чанков субтитров
        docs = self.join_transcript_to_docs(transcript, cf.max_chunk_size)

        # Создать векторное хранилище
        search_index = await FAISS.afrom_documents( docs, self.embeddinger )

        # Сохранить в файл векторное хранилище
        await self.redis.set(redis_key, search_index.serialize_to_bytes())      # serializes the faiss

        return search_index

Сначала, в методе create_faiss, создается или загружается ранее созданное векторное хранилище FAISS для видео по его идентификатору на YouTube. Формируется это хранилище из субтитров видео, которые загружаются в методе load_youtube_transcript. Затем эти субтитры объединются в более крупные блоки и инициализируется векторное хранилище FAISS. После этого созданное хранилище загружается в Redis.

Затем создается RAG цепочка для ответов на запросы. Фреймворк langchain помогает удобно использовать отдельные блоки LCEL и объединять их в цепочки. Первый элемент цепочки обрабатывает запрос с помощью хранилища FAISS и объекта RunnablePassthrough. Результат передается в следующую ячейку self.prompt, где запрос и результат работы векторного хранилища объединяются с промптом и направляются в LLM модель для генерации ответа. Затем последней ячейкой StrOutputParser() извлекается текст ответа из объекта ответа.

Langchain предоставляет единый интерфейс взаимодействия для различных конфигураций цепочек.

# Простой запрос ответ
text = await сhain.ainvoke("Цвет солнца")

# Потоковый запрос, ответ приходит порциями
async for chunk in chain.astream("Цвет солнца"):
	print(chunk, end="", flush=True)

# Список ответов на список запросов единовременно
text_list = await сhain.abatch(["Цвет солнца","Цвет луны"])

Для каждого из этих методов в langchain есть синхронный метод, с тем же именем, но без "a" вначале.

Один из этих методов используется в a_query_answer.

a_query_answer
    async def a_query_answer(self, msg, chain, query):
        try:
            # получить ответ от LLM на запрос полльзователя
            text = await chain.ainvoke(query)
        except GigaChatException:
            # Запись в лог всех ошибок aiogram при отправке в чат
            logger.exception('GigaChatException!')
        else:
            await self.a_post(msg, text)
            return text


    async def a_post(self, msg, text):
        """ Отправить текст в Telegram чат """
        try:
            await msg.answer(text)
        except AiogramError:
            # Запись в лог всех ошибок aiogram при отправке в чат
            logger.exception('AiogramError!')

В нем происходит передача текста запроса в RAG цепочку, а полученный ответ направляется пользователю Telegram бота.

Краткий пересказ видео ролика создается в следующем методе.

summarize
    async def summarize(self, msg, id, faiss):

        redis_key = f'summarizing/{id}'
        
        # Получить сохраненную сумаризацию из БД
        text = await self.redis.get(redis_key)
        if text:
            for chunk in text.split(b'\n'):
                await self.a_post(msg, chunk.decode())
            return

        # Получить список, сортированых по времени, чанков текста субтитров из Faiss хранилища
        docs = list(sorted(faiss.docstore._dict.values(), key=lambda x: x.metadata['start']))
        join_docs = []

        # Сделать каждый блок для сумаризации из `cf.chunks_by_query_from_faiss` штук Faiss чанков 
        for i in range(0, len(docs), cf.chunks_by_query_from_faiss):
            chunks = '\n'.join(d.page_content for d in docs[i:i+cf.chunks_by_query_from_faiss])
            join_docs.append( chunks )

        # LLM цепочка для сумаризации
        chain = self.prompt_sum | self.llm | StrOutputParser()  
        chunks = []
        for doc in join_docs:
            # Сумаризовать блок текста до 25 слов приблизительно
            text = await self.a_query_answer(msg, chain, {'max_words': '25', 'text': doc})
            if text:
                chunks.append(text.encode())
        if chunks:
            # Объеденить сумаризованные блоки и отправить в БД
            await self.redis.set(redis_key, b'\n'.join(chunks))

Для экономии, краткий пересказ также сохраняется в Redis и не генерируется повторно. Для этого пересказа нет необходимости в поиске по векторному хранилищу, поэтому langchain состоит только из трех ячеек: промпт, модели и выходного парсера.

Загрузка субтитров осуществляется в методе load_youtube_transcript с использованием модуля youtube_transcript_api. Сначала метод YouTubeTranscriptApi.list_transcripts загружает доступные типы и языки субтитров. Приоритет отдается авторским субтитрам на английском языке. Затем метод YouTubeTranscriptApi.get_transcript загружает сами субтитры.

load_youtube_transcript
    def load_youtube_transcript(self, video_id):

        # Загрузить метаданные субтитров из youtube видео
        transcript_list = yta.YouTubeTranscriptApi.list_transcripts(video_id)
        generated = set()
        manual = set()

        for tra in transcript_list:
            if tra.is_generated:
                # Создать множество доступных языков текста субтитров, созданных youtube сервисом
                generated.add(tra.language_code)
            else:
                # Создать множество доступных языков текста субтитров, созданных автором
                manual.add(tra.language_code)
        if manual:
            language_code = 'en' if 'en' in manual else manual.pop()
        else:
            language_code = 'en' if 'en' in generated else generated.pop()
        
        # Загрузить субтитры из youtube видео
        transcript = yta.YouTubeTranscriptApi.get_transcript(video_id, languages=[language_code]) # , proxies={"https": "https://user:pass@domain:port"} # format used by the requests library
        return transcript, language_code 

Некоторые методы класса UsersLLMAnswer не приведены в статье. Полный список методов и весь проект доступны на GitHub. Также добавлена возможность использования Embeddings от HuggingFace. Для этого в файле .env, помимо TELEGRAM_BOT_TOKEN и GIGACHAT_CREDENTIALS, необходимо указать переменную HUGGING_FACE_EMBEDDINGS, где можно указать имя модели, например "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2". Или оставить переменную пустой, если имеется платный доступ к GigaChat.

Выводы

Проект иллюстрирует преимущества использования доступа к GigaChat и фреймворков langchain и aiogram для создания RAG Telegram бота для анализа и обработки видеоконтента с YouTube. Благодаря этому сочетанию возможно с легкостью создать бота, способного кратко извлекать информацию из видео и отвечать на вопросы пользователей.

Удобство использования Docker Compose для деплоя проекта позволяет быстро и легко настроить его в своей среде. Этот подход делает проект доступным для широкого круга пользователей, обеспечивая простоту установки и использования.

Однако есть еще многое, куда можно стремиться. Добавление функционала "диалоговости" в проект позволило бы боту взаимодействовать с пользователями более естественным образом, что повысило бы его полезность и привлекательность. Учет родного языка пользователя для генерации ответов на его языке сделал бы бота более универсальным. В целом, проект показывает, что с доступом к GigaChat и фреймворкам langchain и aiogram можно легко создавать инновационные решения.

Комментарии (13)


  1. Andchir
    17.05.2024 15:33

    Правильно ли я понял, что используются стандартные транскрипции YouTube? По-моему они очень не качественные. Может лучше использовать Whisper?
    Кстати, ссылка на Github показывает 404.


    1. theurus
      17.05.2024 15:33

      Виспер не лучше.

      У Гугла в бесплатной версии gemini размер окна 1млн токенов и лимит 1млн в минуту. Может отвечать на вопросы по текстам практически любого размера без танцев с бубном.


      1. Andchir
        17.05.2024 15:33

        При чём тут Gemini? С чего Вы взяли, что Гугл использует его для генерации стандартных транкрипций на YouTube? Там какой-то быстрый и низкокачественный инструмент используется.


        1. jhas7
          17.05.2024 15:33

          Gemini это к статье видимо. В статье рассказывается как из платного гигачада выдавить то что джемини дает бесплатно и без танцев с бубном.


    1. stepvg Автор
      17.05.2024 15:33

      Спасибо, ссылку на GitHub поправил.
      Да, верно транскрипции стандартные.
      Транскрипции к некоторым роликам предоставляют авторы видео, думаю Whisper тогда будет хуже.
      Было бы интересно увидеть сравнительные тесты Whisper и автогенератора YouTube и тогда можно было бы понять что лучше


      1. Andchir
        17.05.2024 15:33

        Я как-то раз сравнивал. Если авторы делают транскрипции, то да, качество лучше. Но стандартные автоматические транскрипции очень низкого качества.


  1. Vindicar
    17.05.2024 15:33

    Ну т.е. опять завязываемся на какой-то сервис, который может отвалиться или изменить условия в любой момент?


    1. rPman
      17.05.2024 15:33
      +2

      ffmpeg (извлечение audio) + whisper.cpp (генерация субтитров с таймингами) + llama.cpp (открытых моделей более чем навалом, например отличная llama3-70b но 8к контекст или mixtral8x22b с 64к контекст) = решаешь свои задачи полностью оффлайн


      1. georgiyozhegov
        17.05.2024 15:33

        Там и Phi-3 от Майкрософта подъехала с контекстом 128k


        1. rPman
          17.05.2024 15:33
          +2

          Так это модели уровня 8b параметров, это другой класс задач, и с ними работать нужно по другому (если задавать им вопросы, то качество будет в лучшем случае 60%), чтобы их хорошо использовать их лучше тюнить на своих данных или форматов запросов

          По lmsys llama3-8b значительно лучше чем все другие 8b модели, но да она так же 8к контекст.

          Напоминаю, что большой контекст не значит что модель умеет его использовать, даже gpt4 с ростом информации все сильнее начинает глючить и к 64к контексту помнит 2 из 10 искомых факта.


          1. georgiyozhegov
            17.05.2024 15:33

            Согласен, просто Phi-3 можно запустить практически на любом ноуте (llama.cpp кстати хорошо запускает её). Правда там уже придётся использовать модель с 4k контекста, а это уже не так уж и много


      1. Vindicar
        17.05.2024 15:33

        Вот это была бы отличная статья. =)


    1. stepvg Автор
      17.05.2024 15:33

      Сервисы от Сбер, OpenAI или Yandex вполне можно считать надежными.
      Офлайн моделей тоже хватает, вопрос только в бюджете.