В этой статье узнаем, как создать систему мониторинга новостей из Telegram‑каналов и чатов с интеллектуальной фильтрацией и отправкой в целевой канал. Прототип мы реализуем на примере анализа экономических новостей.
Статья является логическим продолжением статей «Парсинг Телеграм‑каналов, групп и чатов с обработкой в LLM» и «Парсинг pdf‑отчётов публичных компаний для получения трейдерских инсайтов„.“»
Мы рассмотрим, как агрегировать новости из каналов в Telegram, убирать дубликаты и автоматически детектировать те новости, которые могут повлиять на котировки публичных компаний. Это полезно, если есть желание сыграть на скорости распространения информации и «опередить рынок» в принятии инвестиционных решений на пару минут.
Разработанный шаблон можно применить и для других сценариев. Таких, как отслеживание новостей и акций конкурентов, отслеживание упоминаний компании/темы/услуги содержащие запрос «помогите найти электрика в ЖК...» и еще множестве подобных сценариев, где нужна автоматическая обработка множества сообщений из открытых чатов и каналов.
Приступим к реализации решения
Наш проект состоит из одного основного скрипта, который выполняет Мониторинг Telegram-каналов в реальном времени, раннюю фильтрацию по ключевым словам для отсева заведомо нерелевантных сообщений, векторизацию текста с помощью OpenAI Embeddings, поиск дубликатов в векторной базе Qdrant, интеллектуальную классификацию с помощью LLM Amvera для точного определения тематики и сохранение уникальных сообщений с отправкой в целевой канал.
Проведем подготовку и подготовим списки переменных и зависимостей.
Используем следующие зависимости:
telethon: Асинхронный клиент для Telegram API
qdrant-client: Клиент для работы с векторной базой данных Qdrant
openai: Для создания векторных представлений текста (эмбеддингов)
httpx: Для асинхронных HTTP-запросов к LLM Amvera
python-dotenv:Для загрузки переменных окружения
Мы будем использовать несколько как обязательных, так и опциональных переменных окружения. Вот их список.
Обязательные переменные:
API\_ID = int(os.getenv("API\_ID", "0")) ID приложения Telegram*
API\_HASH = os.getenv("API\_HASH") Хэш приложения Telegram*
OPENAI\_API\_KEY = os.getenv("OPENAI\_API\_KEY") Ключ для OpenAI Embeddings*
QDRANT\_URL = os.getenv("QDRANT\_URL") URL базы данных Qdrant*
QDRANT\_API\_KEY = os.getenv("QDRANT\_API\_KEY") Ключ API Qdrant*
LLM\_API\_KEY = os.getenv("LLM\_API\_KEY") Ключ для LLM Amvera*
Опциональные переменные:
SOURCE\_CHANNEL = os.getenv("SOURCE\_CHANNEL") Канал-источник для мониторинга*
DEST\_CHANNEL = os.getenv("DEST\_CHANNEL") Целевой канал для результатов*
COLLECTION\_NAME = os.getenv("COLLECTION\_NAME", "telegram\_news")
SIMILARITY\_THRESHOLD = float(os.getenv("SIMILARITY\_THRESHOLD", "0.80"))
EMBEDDING\_MODEL = os.getenv("EMBEDDING\_MODEL", "text-embedding-3-small")
А теперь перейдем к самому важному и напишем код нашего сервиса
Скрипт начинает работу с инициализации всех необходимых клиентов и проверки их доступности:
async def init_clients():
global openai_client, qdrant_client
loop = asyncio.get_event_loop()
# OpenAI embeddings client
if OPENAI_API_KEY and OpenAI is not None:
try:
openai_client = OpenAI(api_key=OPENAI_API_KEY)
logger.info("OpenAI client initialized")
except Exception as e:
logger.exception(f"Failed to init OpenAI client: {e}")
openai_client = None
else:
if OPENAI_API_KEY and OpenAI is None:
logger.warning("OpenAI SDK not available - install openai package or adapt code.")
else:
logger.warning("OPENAI_API_KEY not set - embeddings disabled.")
openai_client = None
Перед началом работы система выполняет "preflight checks" — проверяет доступность всех внешних сервисов и при необходимости создает коллекцию в Qdrant.
Проведем предварительную фильтрация по ключевым словам.
Если мы будем полностью обрабатывать все сообщения, мы разоримся на API LLM и неприемлемо повысим нагрузку на сервис. Рациональнее руками задать слова, которые будут характеризовать потенциально интересные нам сообщения и обрабатывать только их.
В будущем можно сделать классификацию на основе ML, но пока просто зададим набор слов, которые можно менять под свои цели.
KEYWORD\_HINTS = [
"акци", "бирж", "курс", "доллар", "евро", "рубл", "цен", "инфляц",
"процент", "ставк", "рынок", "фонд", "индекс", "nasdaq", "sp", "s&p",
"дивиден", "финанс", "эконом", "рецесс", "ввп", "облигац", "доходн",
"отчет", "отчетность", "баланс", "выручк", "прибыл", "речь", "ставк"]
Чтобы адаптировать скрипт под свои нужды, просто отредактируйте список KEYWORD_HINTS, добавив термины из вашей предметной области. Например, для мониторинга криптовалют добавьте: «биткоин», «блокчейн», «эфириум».
Выполним векторизацию и поиск дубликатов
Каждое сообщение преобразуется в векторное представление с помощью OpenAI:
# ========== Embeddings / Qdrant ops ==========
async def embed_text(text: str):
if not openai_client:
raise RuntimeError("OpenAI client not configured")
if not text:
return []
loop = asyncio.get_event_loop()
try:
resp = await loop.run_in_executor(None, lambda: openai_client.embeddings.create(model=EMBEDDING_MODEL, input=text))
emb = resp.data[0].embedding
return emb
except Exception as e:
logger.exception(f"Embedding failed: {e}")
raise
Затем система ищет похожие сообщения в базе Qdrant:
Qdrant это специальная векторная база данных. Вы можете использовать альтернативы в виде расширения PostgreSQL - pgVector или ChromaDB.
async def is_similar_to_existing(vec):
if qdrant_client is None:
return False, 0.0, None
loop = asyncio.get_event_loop()
try:
results = await loop.run_in_executor(
None,
lambda: qdrant_client.search(collection_name=COLLECTION_NAME, query_vector=vec, limit=5, with_payload=True, with_vectors=True)
)
except TypeError:
try:
results = await loop.run_in_executor(
None,
lambda: qdrant_client.search(collection_name=COLLECTION_NAME, query_vector=vec, limit=5, with_payload=True)
)
except Exception as e:
logger.warning(f"Qdrant search failed (fallback): {e}")
return False, 0.0, None
except Exception as e:
logger.warning(f"Qdrant search failed: {e}")
return False, 0.0, None
best_sim = 0.0
best_hit = None
for r in results:
candidate_vector = getattr(r, "vector", None)
if candidate_vector is None and getattr(r, "payload", None):
candidate_vector = r.payload.get("_vector") or r.payload.get("vector")
if candidate_vector:
try:
sim = cosine_sim(vec, candidate_vector)
if sim > best_sim:
best_sim = sim
best_hit = r
except Exception:
logger.debug("Skipping candidate vector due to mismatch")
return (best_sim >= SIMILARITY_THRESHOLD), best_sim, best_hit
Настроем чувствительность срабатывания
Наши сообщения могут быть на одну тему, но отличаться словами. Важно задать такую степень близости, чтобы избежать дублирования срабатывания, но и не пропустить важные новые сообщения, семантически похожие на прошлые инфоповоды, но отличные по сути.
Параметр SIMILARITY_THRESHOLD — это один из самых важных параметров в системе, который позволяет тонко настраивать баланс между обнаружением дубликатов и пропуском уникального контента.
Регулируя его, мы сможем эксперементально подобрать нужный нам порог сходства текстов.
Интеллектуальная классификация с помощью LLM
Это ядро системы. Языковая модель анализирует текст и определяет его релевантность:
async def classify_with_llm(text: str) -> Tuple[bool, str]:
"""
Возвращает (is_relevant, reason).
Использует LLM_BASE_URL + LLM_API_KEY.
Если LLM недоступен или ответ не распарсить — делаем keyword fallback.
"""
text = text.strip()
if not text:
return False, "empty_text"
if not LLM_BASE_URL or not LLM_API_KEY:
return keyword_fallback(text)
prompt = (
"Вы — точный классификатор. Определите, относится ли следующее сообщение к финансово-экономической "
"тематике: акции (фондовый рынок), движение индексов, курсы валют, процентные ставки, "
"макроэкономические показатели (ВВП, инфляция и т.п.), финансовая отчётность компаний, облигации, "
"доходности, банковские/финансовые новости и другие экономические события.\n\n"
"ОТВЕЧАЙТЕ ТОЛЬКО ОДНИМ КОРРЕКТНЫМ JSON-ОБЪЕКТОМ И НИЧЕМ БОЛЕЕ, В ТАКОМ ВИДЕ:\n"
'{"relevant": true|false, "reason": "короткое объяснение на русском", "labels": ["акции","курс_валют","инфляция"]}\n\n'
"Поле 'relevant' — true если сообщение релевантно, false — если нет. "
"В 'labels' перечислите короткие метки (например: \"акции\", \"курс_валют\", \"инфляция\", \"фин_отчётность\", \"облигации\"). "
"В 'reason' дайте краткое пояснение (1–2 коротких фразы).\n\n"
f"Сообщение для анализа:\n\n{text}\n\n"
"Возвращайте ровно один JSON-объект и ничего больше."
)
Не меняя код, вы можете полностью изменить тематику фильтрации, просто отредактировав промпт. Например, для мониторинга ИТ-новостей измените описание тематики в промпте.
Сохранение и пересылка сообщений
Уникальные релевантные сообщения сохраняются в Qdrant и отправляются в целевой канал:
@events.register(events.NewMessage)
async def global_handler(event):
try:
if SOURCE_CHANNEL and str(event.chat_id) not in (str(SOURCE_CHANNEL), SOURCE_CHANNEL):
return
msg = event.message
text = (msg.message or "").strip()
if not text:
logger.info("Empty message — skip")
return
# Ранняя фильтрация по ключевым словам: если нет ни одного хинта — skip
if not has_keyword_hint(text):
logger.info("Early keyword filter: no keyword hints found — skip")
return
logger.info(f"New message (id={msg.id}) — passed keyword filter, embedding...")
try:
emb = await embed_text(text)
except Exception:
logger.exception("Embedding error — skipping message")
return
if not emb:
logger.warning("Empty embedding — skip")
return
# check similarity
similar, sim_score, hit = await is_similar_to_existing(emb)
logger.info(f"Similarity: {sim_score:.4f} (threshold={SIMILARITY_THRESHOLD})")
if similar:
logger.info("Message is similar to existing — skip sending")
return
# If unique, check topic relevance via LLM
logger.info("Classifying message topic with LLM...")
is_relevant, reason = await classify_with_llm(text)
logger.info(f"LLM relevance: {is_relevant}, reason: {reason}")
if not is_relevant:
logger.info("Message not relevant to finance/economics — skip and DO NOT upsert")
return
# Relevant => upsert then send
normalized = normalize_text_for_uuid(text)
point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, normalized))
payload = {
"text": text,
"chat_id": str(event.chat_id),
"message_id": msg.id,
"date": str(msg.date.isoformat() if getattr(msg, "date", None) else time.time()),
"llm_relevance_reason": reason,
}
try:
await upsert_point(point_id, emb, payload)
logger.info("Upserted point into Qdrant")
except Exception:
logger.exception("Upsert failed — not sending to avoid duplicates")
return
# Send to destination channel
if DEST_CHANNEL:
try:
await tg.send_message(DEST_CHANNEL, text)
logger.info(f"Sent to destination {DEST_CHANNEL}")
except ValueError as ve:
logger.error("Cannot get entity for DEST_CHANNEL — ensure bot/user is member and has rights")
logger.exception(ve)
except Exception as e:
logger.exception(f"Failed to send message to DEST_CHANNEL: {e}")
else:
logger.warning("DEST_CHANNEL not set — message not sent")
except Exception as e:
logger.exception(f"Unhandled error in handler: {e}")
Как адаптировать проект под свои нужды
1. Меняем ключевые слова и промты
*# Пример для мониторинга ИТ-новостей*
prompt = (
"Вы — точный классификатор. Определите, относится ли следующее сообщение к ИТ-тематике: "
"программирование, искусственный интеллект, кибербезопасность, облачные технологии, "
"стартапы, инвестиции в технологии, новые гаджеты и устройства.\n\n"
)
2. Настраиваем источники
Укажите каналы для мониторинга и целевой канал в переменных окружения:
SOURCE\_CHANNEL: username или ID канала-источника
DEST\_CHANNEL: username или ID канала для результатов
3. Регулируем параметры фильтрации
SIMILARITY\_THRESHOLD=0.90 — очень строгая фильтрация дубликатов
SIMILARITY\_THRESHOLD=0.70 — более мягкая фильтрация
Изменение KEYWORD\_HINTS — регулировка раннего отсева
4. Выбираем модели эмбеддингов
Можете выбрать свою модель, что позволит ее проще интегрировать в наш проект. Как пример:
text-embedding-3-small - быстрее и дешевле.
text-embedding-3-large - точнее, но дороже.
Проведем тестирование системы фильтрации
Посмотрим, как код отличает дубликаты от уникальных новостей. Для этого проведем серию тестов для проверки эффективности системы мониторинга и фильтрации. Вот результаты двух ключевых тестов, демонстрирующих работу алгоритма в реальных условиях.
Тест 1: Фильтрация точных дубликатов и противоположных новостей
Входные данные:

Результат фильтрации:

Рекламное сообщение про автоподушки отсеяно на этапе ранней фильтрации по ключевым словам. Система не обнаружила в тексте финансово-экономических терминов из списка KEYWORD_HINTS.
Точный дубликат новости про рост акций Oracle отфильтрован системой векторного поиска. Косинусное сходство составило ~0.98, что значительно выше порога SIMILARITY_THRESHOLD=0.80.
Фильтрацию прошла первоначальная новость о росте акций Oracle. Она распознана как релевантная финансово-экономическая информация. И контрастная новость о банкротстве Oracle. Несмотря на упоминание той же компании, содержание кардинально отличается. Векторное представление этого сообщения значительно отличается от оригинала (сходство ~0.35), поэтому система правильно классифицировала его как уникальную информацию.
Тест 2: Интеллектуальное различение похожих формулировок
Входные данные:

Результат фильтрации:

Новость про Oracle прошла фильтрацию, но не показана в результатах, так как тест фокусировался на сообщениях про МТС.
Сообщение «акции МТС торгуются на 5% больше» отфильтровано как семантический дубликат. Хотя формулировка отличается, LLM-классификатор определил, что оба сообщения передают одинаковую смысловую нагрузку о росте акций МТС на 5%. Векторное сходство составило ~0.82.
Фильтрацию прошло оригинальное сообщение о росте акций МТС на 5%. Что мы и ожидали в результате работы нашего скрипта.
Заключение
Мы создали универсальную систему мониторинга и фильтрации контента из Telegram. Такой подход можно адаптировать для таких задач, как мониторинг новостей по конкретной тематике, отслеживание упоминаний брендов, сбор рыночной аналитики и создание дайджестов контента из различных источников.