В этой статье узнаем, как создать систему мониторинга новостей из Telegram‑каналов и чатов с интеллектуальной фильтрацией и отправкой в целевой канал. Прототип мы реализуем на примере анализа экономических новостей.

Статья является логическим продолжением статей «Парсинг Телеграм‑каналов, групп и чатов с обработкой в LLM» и «Парсинг pdf‑отчётов публичных компаний для получения трейдерских инсайтов„.“»

Мы рассмотрим, как агрегировать новости из каналов в Telegram, убирать дубликаты и автоматически детектировать те новости, которые могут повлиять на котировки публичных компаний. Это полезно, если есть желание сыграть на скорости распространения информации и «опередить рынок» в принятии инвестиционных решений на пару минут.

Разработанный шаблон можно применить и для других сценариев. Таких, как отслеживание новостей и акций конкурентов, отслеживание упоминаний компании/темы/услуги содержащие запрос «помогите найти электрика в ЖК...» и еще множестве подобных сценариев, где нужна автоматическая обработка множества сообщений из открытых чатов и каналов.

Приступим к реализации решения

Наш проект состоит из одного основного скрипта, который выполняет Мониторинг Telegram-каналов в реальном времени, раннюю фильтрацию по ключевым словам для отсева заведомо нерелевантных сообщений, векторизацию текста с помощью OpenAI Embeddings, поиск дубликатов в векторной базе Qdrant, интеллектуальную классификацию с помощью LLM Amvera для точного определения тематики и сохранение уникальных сообщений с отправкой в целевой канал.

Проведем подготовку и подготовим списки переменных и зависимостей.

Используем следующие зависимости:

  • telethon: Асинхронный клиент для Telegram API

  • qdrant-client: Клиент для работы с векторной базой данных Qdrant

  • openai: Для создания векторных представлений текста (эмбеддингов)

  • httpx: Для асинхронных HTTP-запросов к LLM Amvera

  • python-dotenv:Для загрузки переменных окружения

Мы будем использовать несколько как обязательных, так и опциональных переменных окружения. Вот их список.

Обязательные переменные:

API\_ID = int(os.getenv("API\_ID", "0"))          ID приложения Telegram*
API\_HASH = os.getenv("API\_HASH")                Хэш приложения Telegram*
OPENAI\_API\_KEY = os.getenv("OPENAI\_API\_KEY")  Ключ для OpenAI Embeddings*
QDRANT\_URL = os.getenv("QDRANT\_URL")            URL базы данных Qdrant*
QDRANT\_API\_KEY = os.getenv("QDRANT\_API\_KEY")  Ключ API Qdrant*
LLM\_API\_KEY = os.getenv("LLM\_API\_KEY")        Ключ для LLM Amvera*

Опциональные переменные:

SOURCE\_CHANNEL = os.getenv("SOURCE\_CHANNEL")    Канал-источник для мониторинга*
DEST\_CHANNEL = os.getenv("DEST\_CHANNEL")        Целевой канал для результатов*
COLLECTION\_NAME = os.getenv("COLLECTION\_NAME", "telegram\_news")
SIMILARITY\_THRESHOLD = float(os.getenv("SIMILARITY\_THRESHOLD", "0.80"))
EMBEDDING\_MODEL = os.getenv("EMBEDDING\_MODEL", "text-embedding-3-small")

А теперь перейдем к самому важному и напишем код нашего сервиса

Скрипт начинает работу с инициализации всех необходимых клиентов и проверки их доступности:

async def init_clients():
    global openai_client, qdrant_client
    loop = asyncio.get_event_loop()

    # OpenAI embeddings client
    if OPENAI_API_KEY and OpenAI is not None:
        try:
            openai_client = OpenAI(api_key=OPENAI_API_KEY)
            logger.info("OpenAI client initialized")
        except Exception as e:
            logger.exception(f"Failed to init OpenAI client: {e}")
            openai_client = None
    else:
        if OPENAI_API_KEY and OpenAI is None:
            logger.warning("OpenAI SDK not available - install openai package or adapt code.")
        else:
            logger.warning("OPENAI_API_KEY not set - embeddings disabled.")
        openai_client = None

Перед началом работы система выполняет "preflight checks" — проверяет доступность всех внешних сервисов и при необходимости создает коллекцию в Qdrant.

Проведем предварительную фильтрация по ключевым словам.

Если мы будем полностью обрабатывать все сообщения, мы разоримся на API LLM и неприемлемо повысим нагрузку на сервис. Рациональнее руками задать слова, которые будут характеризовать потенциально интересные нам сообщения и обрабатывать только их.

В будущем можно сделать классификацию на основе ML, но пока просто зададим набор слов, которые можно менять под свои цели.

KEYWORD\_HINTS = [
"акци", "бирж", "курс", "доллар", "евро", "рубл", "цен", "инфляц",
"процент", "ставк", "рынок", "фонд", "индекс", "nasdaq", "sp", "s&p",
"дивиден", "финанс", "эконом", "рецесс", "ввп", "облигац", "доходн",
"отчет", "отчетность", "баланс", "выручк", "прибыл", "речь", "ставк"]

Чтобы адаптировать скрипт под свои нужды, просто отредактируйте список KEYWORD_HINTS, добавив термины из вашей предметной области. Например, для мониторинга криптовалют добавьте: «биткоин», «блокчейн», «эфириум».

Выполним векторизацию и поиск дубликатов

Каждое сообщение преобразуется в векторное представление с помощью OpenAI:

# ========== Embeddings / Qdrant ops ==========
async def embed_text(text: str):
    if not openai_client:
        raise RuntimeError("OpenAI client not configured")
    if not text:
        return []
    loop = asyncio.get_event_loop()
    try:
        resp = await loop.run_in_executor(None, lambda: openai_client.embeddings.create(model=EMBEDDING_MODEL, input=text))
        emb = resp.data[0].embedding
        return emb
    except Exception as e:
        logger.exception(f"Embedding failed: {e}")
        raise

Затем система ищет похожие сообщения в базе Qdrant:

Qdrant это специальная векторная база данных. Вы можете использовать альтернативы в виде расширения PostgreSQL - pgVector или ChromaDB.

async def is_similar_to_existing(vec):
    if qdrant_client is None:
        return False, 0.0, None
    loop = asyncio.get_event_loop()
    try:
        results = await loop.run_in_executor(
            None,
            lambda: qdrant_client.search(collection_name=COLLECTION_NAME, query_vector=vec, limit=5, with_payload=True, with_vectors=True)
        )
    except TypeError:
        try:
            results = await loop.run_in_executor(
                None,
                lambda: qdrant_client.search(collection_name=COLLECTION_NAME, query_vector=vec, limit=5, with_payload=True)
            )
        except Exception as e:
            logger.warning(f"Qdrant search failed (fallback): {e}")
            return False, 0.0, None
    except Exception as e:
        logger.warning(f"Qdrant search failed: {e}")
        return False, 0.0, None

    best_sim = 0.0
    best_hit = None
    for r in results:
        candidate_vector = getattr(r, "vector", None)
        if candidate_vector is None and getattr(r, "payload", None):
            candidate_vector = r.payload.get("_vector") or r.payload.get("vector")
        if candidate_vector:
            try:
                sim = cosine_sim(vec, candidate_vector)
                if sim > best_sim:
                    best_sim = sim
                    best_hit = r
            except Exception:
                logger.debug("Skipping candidate vector due to mismatch")
    return (best_sim >= SIMILARITY_THRESHOLD), best_sim, best_hit

Настроем чувствительность срабатывания

Наши сообщения могут быть на одну тему, но отличаться словами. Важно задать такую степень близости, чтобы избежать дублирования срабатывания, но и не пропустить важные новые сообщения, семантически похожие на прошлые инфоповоды, но отличные по сути.

Параметр SIMILARITY_THRESHOLD — это один из самых важных параметров в системе, который позволяет тонко настраивать баланс между обнаружением дубликатов и пропуском уникального контента.

Регулируя его, мы сможем эксперементально подобрать нужный нам порог сходства текстов.

Интеллектуальная классификация с помощью LLM

Это ядро системы. Языковая модель анализирует текст и определяет его релевантность:

async def classify_with_llm(text: str) -> Tuple[bool, str]:
    """
    Возвращает (is_relevant, reason).
    Использует LLM_BASE_URL + LLM_API_KEY.
    Если LLM недоступен или ответ не распарсить — делаем keyword fallback.
    """
    text = text.strip()
    if not text:
        return False, "empty_text"

    if not LLM_BASE_URL or not LLM_API_KEY:
        return keyword_fallback(text)

    prompt = (
        "Вы — точный классификатор. Определите, относится ли следующее сообщение к финансово-экономической "
        "тематике: акции (фондовый рынок), движение индексов, курсы валют, процентные ставки, "
        "макроэкономические показатели (ВВП, инфляция и т.п.), финансовая отчётность компаний, облигации, "
        "доходности, банковские/финансовые новости и другие экономические события.\n\n"
        "ОТВЕЧАЙТЕ ТОЛЬКО ОДНИМ КОРРЕКТНЫМ JSON-ОБЪЕКТОМ И НИЧЕМ БОЛЕЕ, В ТАКОМ ВИДЕ:\n"
        '{"relevant": true|false, "reason": "короткое объяснение на русском", "labels": ["акции","курс_валют","инфляция"]}\n\n'
        "Поле 'relevant' — true если сообщение релевантно, false — если нет. "
        "В 'labels' перечислите короткие метки (например: \"акции\", \"курс_валют\", \"инфляция\", \"фин_отчётность\", \"облигации\"). "
        "В 'reason' дайте краткое пояснение (1–2 коротких фразы).\n\n"
        f"Сообщение для анализа:\n\n{text}\n\n"
        "Возвращайте ровно один JSON-объект и ничего больше."
    )

Не меняя код, вы можете полностью изменить тематику фильтрации, просто отредактировав промпт. Например, для мониторинга ИТ-новостей измените описание тематики в промпте.

Сохранение и пересылка сообщений

Уникальные релевантные сообщения сохраняются в Qdrant и отправляются в целевой канал:

@events.register(events.NewMessage)
async def global_handler(event):
    try:
        if SOURCE_CHANNEL and str(event.chat_id) not in (str(SOURCE_CHANNEL), SOURCE_CHANNEL):
            return

        msg = event.message
        text = (msg.message or "").strip()
        if not text:
            logger.info("Empty message — skip")
            return

        # Ранняя фильтрация по ключевым словам: если нет ни одного хинта — skip
        if not has_keyword_hint(text):
            logger.info("Early keyword filter: no keyword hints found — skip")
            return

        logger.info(f"New message (id={msg.id}) — passed keyword filter, embedding...")
        try:
            emb = await embed_text(text)
        except Exception:
            logger.exception("Embedding error — skipping message")
            return

        if not emb:
            logger.warning("Empty embedding — skip")
            return

        # check similarity
        similar, sim_score, hit = await is_similar_to_existing(emb)
        logger.info(f"Similarity: {sim_score:.4f} (threshold={SIMILARITY_THRESHOLD})")
        if similar:
            logger.info("Message is similar to existing — skip sending")
            return

        # If unique, check topic relevance via LLM
        logger.info("Classifying message topic with LLM...")
        is_relevant, reason = await classify_with_llm(text)
        logger.info(f"LLM relevance: {is_relevant}, reason: {reason}")

        if not is_relevant:
            logger.info("Message not relevant to finance/economics — skip and DO NOT upsert")
            return

        # Relevant => upsert then send
        normalized = normalize_text_for_uuid(text)
        point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, normalized))
        payload = {
            "text": text,
            "chat_id": str(event.chat_id),
            "message_id": msg.id,
            "date": str(msg.date.isoformat() if getattr(msg, "date", None) else time.time()),
            "llm_relevance_reason": reason,
        }

        try:
            await upsert_point(point_id, emb, payload)
            logger.info("Upserted point into Qdrant")
        except Exception:
            logger.exception("Upsert failed — not sending to avoid duplicates")
            return

        # Send to destination channel
        if DEST_CHANNEL:
            try:
                await tg.send_message(DEST_CHANNEL, text)
                logger.info(f"Sent to destination {DEST_CHANNEL}")
            except ValueError as ve:
                logger.error("Cannot get entity for DEST_CHANNEL — ensure bot/user is member and has rights")
                logger.exception(ve)
            except Exception as e:
                logger.exception(f"Failed to send message to DEST_CHANNEL: {e}")
        else:
            logger.warning("DEST_CHANNEL not set — message not sent")

    except Exception as e:
        logger.exception(f"Unhandled error in handler: {e}")

Как адаптировать проект под свои нужды

1. Меняем ключевые слова и промты

*# Пример для мониторинга ИТ-новостей*

prompt = (
"Вы — точный классификатор. Определите, относится ли следующее сообщение к ИТ-тематике: "
"программирование, искусственный интеллект, кибербезопасность, облачные технологии, "
"стартапы, инвестиции в технологии, новые гаджеты и устройства.\n\n"
)

2. Настраиваем источники

Укажите каналы для мониторинга и целевой канал в переменных окружения:

SOURCE\_CHANNEL: username или ID канала-источника
DEST\_CHANNEL: username или ID канала для результатов

3. Регулируем параметры фильтрации

SIMILARITY\_THRESHOLD=0.90 — очень строгая фильтрация дубликатов
SIMILARITY\_THRESHOLD=0.70 — более мягкая фильтрация
Изменение KEYWORD\_HINTS — регулировка раннего отсева

4. Выбираем модели эмбеддингов

Можете выбрать свою модель, что позволит ее проще интегрировать в наш проект. Как пример:

  • text-embedding-3-small - быстрее и дешевле.

  • text-embedding-3-large - точнее, но дороже.

Проведем тестирование системы фильтрации

Посмотрим, как код отличает дубликаты от уникальных новостей. Для этого проведем серию тестов для проверки эффективности системы мониторинга и фильтрации. Вот результаты двух ключевых тестов, демонстрирующих работу алгоритма в реальных условиях.

Тест 1: Фильтрация точных дубликатов и противоположных новостей

Входные данные:

Есть как целевые, так и нецелевые сообщения
Есть как целевые, так и нецелевые сообщения

Результат фильтрации:

В результате видим только целевые сообщения
В результате видим только целевые сообщения

Рекламное сообщение про автоподушки отсеяно на этапе ранней фильтрации по ключевым словам. Система не обнаружила в тексте финансово-экономических терминов из списка KEYWORD_HINTS.

Точный дубликат новости про рост акций Oracle отфильтрован системой векторного поиска. Косинусное сходство составило ~0.98, что значительно выше порога SIMILARITY_THRESHOLD=0.80.

Фильтрацию прошла первоначальная новость о росте акций Oracle. Она распознана как релевантная финансово-экономическая информация. И контрастная новость о банкротстве Oracle. Несмотря на упоминание той же компании, содержание кардинально отличается. Векторное представление этого сообщения значительно отличается от оригинала (сходство ~0.35), поэтому система правильно классифицировала его как уникальную информацию.

Тест 2: Интеллектуальное различение похожих формулировок

Входные данные:

Три сообщения, из которых должно переслать только одно новое и уникальное.
Три сообщения, из которых должно переслать только одно новое и уникальное.

Результат фильтрации:

Скрипт не стал пересылать дубликат. Все хорошо.
Скрипт не стал пересылать дубликат. Все хорошо.

Новость про Oracle прошла фильтрацию, но не показана в результатах, так как тест фокусировался на сообщениях про МТС.

Сообщение «акции МТС торгуются на 5% больше» отфильтровано как семантический дубликат. Хотя формулировка отличается, LLM-классификатор определил, что оба сообщения передают одинаковую смысловую нагрузку о росте акций МТС на 5%. Векторное сходство составило ~0.82.

Фильтрацию прошло оригинальное сообщение о росте акций МТС на 5%. Что мы и ожидали в результате работы нашего скрипта.

Заключение

Мы создали универсальную систему мониторинга и фильтрации контента из Telegram. Такой подход можно адаптировать для таких задач, как мониторинг новостей по конкретной тематике, отслеживание упоминаний брендов, сбор рыночной аналитики и создание дайджестов контента из различных источников.

Полный код проекта доступен на GitHub.

Комментарии (0)