Привет, чемпионы! Сегодня мы создадим вашего персонального аналитика источников, который будет вытаскивать самое важное из ваших любимых Telegram-каналов.

Мы соберём RAG-пайплайн, который по запросу проанализирует последние новости по интересующим темам и выдаст понятный отчёт. Разберём пошагово всю структуру и подумаем, как можно развивать и улучшить эту систему.

Как устроен наш RAG-пайплайн?

Концептуально схема выглядит так:

  1. Собираем свежие новости за последние n дней из наших любимых тг-каналов.

  2. Задаём запрос на интересующую тему.

  3. Ищем релевантные новости при помощи механизма Maximal Marginal Relevance (MMR). Данный запрос даст нам fetch_k наиболее близких документов по эмбеддингам, после чего из них он возьмет k документов, которые не только близки к запросу, но и максимально непохожи друг на друга. Делаем это мы для того, чтобы мы не получали одни и те же новости, если они перекликаются из наших источников.

  4. После чего мы обрабатываем наши релевантные документы уже с помощью reranker, чтоб уже отранжировать их лучше по смыслу сопоставляя запрос query и document

  5. И подаем все это в контекст модели, чтоб на основе этого уже она помогла нам сформировать отчет по тому, что мы для нее обработали и нашему запросу.

Сбор данных

Выберем пул каналов, которые хотим анализировать и получать по ним информацию.

После переходим в https://my.telegram.org/auth где получаем API_ID и API_HASH для эмуляции работы пользователя, далее заполняем их в конфиге нашего будущего софта.

class Config:
    API_ID = "your_api_id"              # Получи в https://my.telegram.org/auth
    API_HASH = "your_api_hash"
    PHONE = "+7xxxxxxxxxx"              # Телефон, привязанный к Telegram
    YOUR_TG_PASSW = "your_2fa_password" # Если включена двухфакторная авторизация
    CHANNELS = [                        # Список ссылок на каналы
        "https://t.me/datafeeling",
        "https://t.me/data_science_winners"
    ]
    JSON_FILE = "posts_last_week.json" # Путь к JSON-файлу с постами
    LLM_MODEL = "qwen3:1.7b"            # Модель Ollama, которую использовать

Затем собираем и сохраняем в json файл с помощью кода нашей реализации TelegramChannelScraper.

Скрытый текст
import json
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import List, Dict

from telethon.sync import TelegramClient
from telethon.tl.functions.messages import GetHistoryRequest
from telethon.tl.types import Message
from tqdm import tqdm

from core.cfg import Config


logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")



class ChannelPost:
    def __init__(self, channel: str, message: Message):
        self.channel = channel
        self.id = message.id
        self.date = message.date
        self.text = message.message or ""

    def to_dict(self) -> Dict:
        return {
            "channel": self.channel,
            "id": self.id,
            "date": self.date.isoformat(),
            "text": self.text
        }


class PostFilter:
    def __init__(self, days_ago: int):
        self.cutoff = datetime.now(timezone.utc) - timedelta(days=days_ago)

    def is_recent(self, msg: Message) -> bool:
        return msg.date >= self.cutoff


class TelegramChannelScraper:
    def __init__(self, client: TelegramClient, filter: PostFilter):
        self.client = client
        self.filter = filter

    def scrape_channel(self, url: str, limit: int = 100) -> List[ChannelPost]:
        logging.info(f"Обработка канала: {url}")
        try:
            entity = self.client.get_entity(url)
            history = self.client(GetHistoryRequest(
                peer=entity, limit=limit,
                offset_date=None, offset_id=0,
                max_id=0, min_id=0, add_offset=0,
                hash=0
            ))

            posts = [
                ChannelPost(url, msg)
                for msg in history.messages
                if self.filter.is_recent(msg)
            ]
            logging.info(f"Найдено {len(posts)} постов в {url}")
            return posts

        except Exception as e:
            logging.error(f"Ошибка при обработке {url}: {e}")
            return []


class JSONPostSaver:
    def __init__(self, path: str):
        self.path = path

    def save(self, posts: List[ChannelPost]):
        with open(self.path, "w", encoding="utf-8") as f:
            json.dump([p.to_dict() for p in posts], f, ensure_ascii=False, indent=2)
        logging.info(f"✅ Сохранено {len(posts)} постов в {self.path}")



def main():
    api_id = Config.API_ID
    api_hash = Config.API_HASH
    phone = Config.PHONE
    tg_pass = Config.YOUR_TG_PASSW
    channels = Config.CHANNELS

    client = TelegramClient("session_name", api_id, api_hash)
    client.start(phone=phone, password=tg_pass)

    filter = PostFilter(days_ago=14)
    scraper = TelegramChannelScraper(client, filter)

    save_path = Config.PATH_RES
    save_dir = os.path.dirname(save_path)

    if not os.path.exists(save_dir):
        os.makedirs(save_dir, exist_ok=True)

    saver = JSONPostSaver(save_path)

    all_posts = []

    logging.info("Начинаю сбор постов за последние 14 дней...")
    for url in tqdm(channels, desc="Каналы"):
        all_posts.extend(scraper.scrape_channel(url))

    saver.save(all_posts)

if __name__ == "__main__":
    main()

Этап обработки и реранжировки данных

После сбора данных, и предварительной их обработки, мы формируем модуль retrieval, который отвечает за сохранение эмбеддингов и последующий поиск релевантной информации по пользовательскому запросу.

На первом этапе модуль инициализирует хранилище векторных представлений (Chroma) с использованием предобученной модели эмбеддингов (OllamaEmbeddings). После очистки коллекции (reset_collection), в хранилище добавляются предварительно обработанные документы, преобразованные в эмбеддинги.

Затем при выполнении запроса пользователя вызывается метод search, в котором создаётся ретривер с типом поиска "mmr" (Maximal Marginal Relevance). Он позволяет выбирать наиболее релевантные и при этом разнообразные документы из векторного хранилища. Параметр k определяет количество возвращаемых уникальных документов, а fetch_k - число предварительно отбираемых кандидатов для последующего ранжирования.

Результатом работы модуля является набор документов, максимально соответствующих информационному запросу, которые затем подаются на вход следующего этапа пайплайна.

Скрытый текст
from langchain_chroma import Chroma
from langchain_ollama import OllamaEmbeddings

from core.cfg import Config



class Retriever:
    def __init__(self, documents):
        self.db = Chroma(persist_directory=Config.PERSISTENT_DIRECTORY,
                         embedding_function=OllamaEmbeddings(model=Config.EMBEDDING_MODEL))
        self.db.reset_collection()
        self.db.add_documents(documents)

    def search(self, query, k=10):
        retriever = self.db.as_retriever(search_type="mmr", search_kwargs={"k": k, "fetch_k": 20})
        return retriever.invoke(query)

Следующим этапом в нашем пайплайне является ключевой компонент - reranker. Он используется после первичного извлечения документов из векторного хранилища: когда мы уже получили набор релевантных текстов на основе близости эмбеддингов, reranker позволяет перераспределить приоритеты между ними, учитывая более глубокие семантические соответствия с пользовательским запросом.

Как правило, для этого применяются специализированные модели reranking-класса (например, из семейств cross-encoder или bge-reranker). Однако в нашем случае использование таких моделей оказалось либо недостаточно эффективным, либо неоправданно трудозатратным с учётом необходимости дополнительного обучения.

В качестве альтернативы мы применили более гибкий и в то же время мощный подход - LLM-based reranker, основанный на использовании модели qwen3:1.7b. Мы формируем промпт, в который включаем пользовательский запрос и предварительно отобранные документы, после чего просим модель отсортировать их по степени релевантности.

    prompt = (
        "Ты ассистент по поиску. Ниже дан запрос пользователя и список документов. "
        "Для каждого документа оцени степень релевантности к запросу по 10-балльной шкале "
        "(0 = нерелевантен, 10 = максимально релевантен). "
        "Ответь в формате: <номер документа>: <оценка>\n\n"
        f"Запрос: {query}\n\nДокументы:\n" +
        "\n".join([f"{i+1}. {doc}" for i, doc in enumerate(docs)]) +
        "\n\nТвои оценки:"
    )

Что позволяет нам, без необходимости дообучения специализированных моделей, получить простой, но эффективный reranking-механизм - своего рода «маленького интеллектуального ассистента», способного делать переоценку результатов, ориентируясь на полный контекст запроса.

Собираем нашего интеллектуального франкенштейна

Наконец, мы подключаем мозг всей системы, как qwen3:4b, и запускаем RAG-пайплайн по нашему запросу.

Скрытый текст
from llm_rel.llm_hadler import LLMHandler
from doc_rel.text_proc import TextProcessor
from core.cfg import Config
from doc_rel.retriver import Retriever
from utils.utils import JSONLoader
from doc_rel.reranker import ollama_rerank_langchain


class RAGSystem:
    def __init__(self):
        self.posts = JSONLoader.load_json(Config.JSON_FILE)
        self.text_processor = TextProcessor()
        self.documents = self.text_processor.process_documents(self.posts)
        self.retriever = Retriever(self.documents)
        self.llm_handler = LLMHandler(Config.LLM_MODEL)
        self.reranker = ollama_rerank_langchain

    def run_query(self, query):
        relevant_docs = self.retriever.search(query)
        doc_texts = [doc.page_content for doc in relevant_docs]
        print("Найдено документов:", len(doc_texts))
        reranked = self.reranker(query, doc_texts, top_k=5)

        if isinstance(reranked, tuple):
            reranked_texts, debug_output = reranked
            print(" Ответ LLM:\n", debug_output)
        else:
            reranked_texts = reranked

        reranked_docs = []
        for score, text in reranked_texts:
            for doc in relevant_docs:
                if doc.page_content == text:
                    reranked_docs.append(doc)
                    break

        print("Отранжированных документов:", len(reranked_docs))

        return self.llm_handler.generate_answer(query, reranked_docs)



if __name__ == "__main__":
    rag_system = RAGSystem()
    query = "Какие ключевые тренды и технологические новинки обсуждали за последнюю неделю?"
    response = rag_system.run_query(query)
    print(response)

В итоге мы получаем следующий результат по нашему запросу:

query = "Какие ключевые тренды и технологические новинки обсуждали за последнюю неделю?"

Ключевые тренды и технологические новинки, обсуждаемые в документах:

  1. ИИ в медицине: Нейросети Яндекса помогли врачам ускорить запуск клинических исследований, сократив время на анализ документации и сократив количество отклонённых заявок.

  2. Рынок LLM: Появление новых технологий и платформ (например, продукты Google, включая Veo3 и Deep Search в Gemini) изменило расстановку сил на рынке больших языковых моделей.

  3. Интеграция ИИ в мессенджеры: Интеграция ИИ (например, Grok) в Telegram, что позволяет использовать ИИ в чатах без регистрации, потеснив больших игроков вроде Google и Apple.

  4. Доступность ИИ-инструментов: Развитие платформ, позволяющих использовать ИИ в привычных чатах, снижает барьеры для широкой аудитории.

Эти тренды отражают активное развитие ИИ в различных сферах и его интеграцию в повседневные приложения.

Итог: небольшая система, которая потребует от вас 12 ГБ VRAM.

Куда дальше?

Мы можем масштабировать эту систему. У каждого юзера приложения появляется его сборщик данных с интересующих его тг каналов, после чего он отправляет запрос с тем на какую тему ему готовить отчет и собранные с его тг каналов данные в json.

Серверная же часть организует работу над тем, чтобы обработать его данные и выдавать ему по заданному пользователем времени отчёт. Масштабирование также позволит в целом взять и модельки помощнее, что думаю скажется и на дальнейших метриках качества, как и формированием пула источников для парсинга информации

Итоги

Спасибо, что дочитали до конца! Если хотите 2 часть про развитие и масштабирование этой системы, то пишите в комментариях и поддержите лайками! Надеюсь данная работа покажет вам как можно сделать "своего ассистента" дома.

Ссылку на полный репозиторий вы можете найти тут

? Ставьте лайк и пишите, какие темы разобрать дальше! Главное — пробуйте и экспериментируйте!


✔️ Присоединяйтесь к нашему Telegram-сообществу @datafeeling, где мы делимся новыми инструментами, кейсами, инсайтами и рассказываем, как всё это применимо к реальным задачам

Комментарии (0)