Здравствуйте дорогие хабровчане, в этом посте я хочу показать, как написать свой агрегатор новостей. Конечно, сразу становится очевидно, что это очередное изобретение велосипеда, однако анализируя существующие решения я всё время натыкался на камни преткновения. То они слишком медленно обновлялись, то не было нужных мне источников или часто бывало, что вообще ничего не работало без возможности починить. В итоге я написал своё решение.
Автор статьи приторговывает на бирже, и главной мотивацией было собрать все новости по интересующей теме в одном месте, чтобы не мониторить десяток различных источников вручную.
Текст под катом по большей части технический и будет, скорее всего, интересен читателям, которые сами торгуют на бирже и при этом в IT теме, либо тем, кто сам давно хотел написать агрегатор чего-нибудь.
Об агрегаторе новостей я размышлял уже давно. Во время торговли на бирже мне постоянно приходилось мониторить десяток авторитетных источников, особенно это напрягало, когда должна была выйти какая-нибудь новость, которая точно будет влиять на курс цены акций. В такие моменты было особенно сложно и обидно, когда подобную новость я пропускал. В общем, мне нужен был инструмент, с которым я мог бы оставаться в курсе всего.
Чтобы упростить понимание я написал два агрегатора, один — простой, его рассмотрю здесь. Код второго агрегатора, которым я пользуюсь сам, будет приложен в конце статьи. Простой агрегатор, в сущности, является более упрощённой версией сложного.
Основными источниками информации были телеграм каналы и новостные сайты. Для парсинга телеграма я выбрал telethon. Новости с сайтов можно забирать через RSS каналы с помощью feedparser. Однако, не на всех сайтах есть RSS, в этом случае буду парсить сайт напрямую используя scrapy. Полученные новости сливаются в отдельный телеграм канал с указанием источника.
Каждый парсер написан таким образом, чтобы его можно было запустить отдельно от остальных. Это значительно упрощает процесс добавления новых источников, их лучше проверять отдельно, чтобы убедиться в работоспособности. Например, feedparser может не прочитать RSS канал и тогда его придется парсить вручную.
Репозиторий с исходным кодом (простой парсер) — на GitHub.
1. Парсим телеграм канал

Чтобы telethon работал, необходимо для своего телеграм аккаунта создать переменные api_id и api_hash на сайте my.telegram.org и добавить эти параметры в скрипт. При первом запуске telethon сам создаёт файл с названием сессии (в нашем случае это gazp.session), его удалять не нужно, иначе придётся проходить аутентификацию ещё раз при следующем запуске.
Парсер сам по себе очень простой, считывает посты из канала @prime1 и печатает их в консоль, либо отсылает их, если в параметрах метода telegram_parser определена функция send_message_func.
from telethon import TelegramClient, events
def telegram_parser(send_message_func=None, loop=None):
'''Телеграм парсер'''
# Параметры из my.telegram.org
api_id = <Твой api_id>
api_hash = <Твой api_hash>
# Канал источник новостей @prime1
channel_source = 'https://t.me/prime1'
# Сессия клиента telethon
session = 'gazp'
client = TelegramClient(session, api_id, api_hash, loop=loop)
client.start()
@client.on(events.NewMessage(chats=channel_source))
async def handler(event):
'''Забирает посты из телеграмм каналов и посылает их в наш канал'''
if send_message_func is None:
print(event.raw_text, '\n')
else:
await send_message_func(f'@prime1\n{event.raw_text}')
return client
if __name__ == "__main__":
client = telegram_parser()
client.run_until_disconnected()2. Парсим RSS

Примечание: RSS — крайне удобная вещь, в сущности это xml-файл, который мало весит, и в котором нет ничего лишнего. Такой файл сервер отдаёт без лишней нагрузки и клиент может легко его распарсить, имея при этом минимальные задержки для обновления.
Так как парсер забирает каждый раз N новостей, то каждый раз скачиваются старые новости, которые уже были напечатаны/отправлены. Для решения этой проблемы я ввёл очередь posted_q. Ясно, что вообще все сообщения сохранять не вариант, т.к. это потребует много памяти и, в конечном счёте, приведёт к ошибке MemoryError, когда она закончится. Кроме того в большом массиве долго проверять сообщения на повтор.
Таким образом, устаревшие сообщения нужно удалять, а новые сохранять, что и происходит в очереди. Слева в неё входят свежие новости, а справа удаляются старые, т.е. хранится всего N сообщений в моменте. В качестве ключа в очередь сохраняются первые 50 символов от текста новости, что также сделано для ускорения работы скрипта.
Парсер скачивает новости с RSS канала сайта www.rbc.ru.
import httpx
import asyncio
from collections import deque
import feedparser
async def rss_parser(httpx_client, posted_q,
n_test_chars, send_message_func=None):
'''Парсер rss ленты'''
rss_link = 'https://rssexport.rbc.ru/rbcnews/news/20/full.rss'
while True:
try:
response = await httpx_client.get(rss_link)
except:
await asyncio.sleep(10)
continue
feed = feedparser.parse(response.text)
for entry in feed.entries[::-1]:
summary = entry['summary']
title = entry['title']
news_text = f'{title}\n{summary}'
head = news_text[:n_test_chars].strip()
if head in posted_q:
continue
if send_message_func is None:
print(news_text, '\n')
else:
await send_message_func(f'rbc.ru\n{news_text}')
posted_q.appendleft(head)
await asyncio.sleep(5)
if __name__ == "__main__":
# Очередь из уже опубликованных постов, чтобы их не дублировать
posted_q = deque(maxlen=20)
# 50 первых символов от текста новости - это ключ для проверки повторений
n_test_chars = 50
httpx_client = httpx.AsyncClient()
asyncio.run(rss_parser(httpx_client, posted_q, n_test_chars))3. Парсим сайт напрямую

Кастомный парсер работает так же, как и RSS парсер, за тем лишь исключением, что используется scrapy вместо feedparser и скачивается вся страница, в которой кроме новостей ещё есть куча всего. Из-за этого приходится выставлять бо́льшую паузу между обращениями, ведь если слишком активно напрягать сервер, он может и забанить на какое-то время.
Подобный вид скриптов приходится писать, если у новостного сайта нет оперативно обновляемого телеграм и/или RSS канала. Парсер скачивает новости напрямую с сайта www.bcs-express.ru.
import httpx
import asyncio
from collections import deque
from scrapy.selector import Selector
async def bcs_parser(httpx_client, posted_q,
n_test_chars, send_message_func=None):
'''Кастомный парсер сайта bcs-express.ru'''
bcs_link = 'https://bcs-express.ru/category'
while True:
try:
response = await httpx_client.get(bcs_link)
except:
await asyncio.sleep(20)
continue
selector = Selector(text=response.text)
for row in selector.xpath('//div[@class="feed__list"]/div/div')[::-1]:
raw_text = row.xpath('*//text()').extract()
title = raw_text[3]
summary = raw_text[5]
news_text = f'{title}\n{summary}'
head = news_text[:n_test_chars].strip()
if head in posted_q:
continue
if send_message_func is None:
print(news_text, '\n')
else:
await send_message_func(f'bcs-express.ru\n{news_text}')
posted_q.appendleft(head)
await asyncio.sleep(10)
if __name__ == "__main__":
# Очередь из уже опубликованных постов, чтобы их не дублировать
posted_q = deque(maxlen=20)
# 50 первых символов от текста новости - это ключ для проверки повторений
n_test_chars = 50
httpx_client = httpx.AsyncClient()
asyncio.run(bcs_parser(httpx_client, posted_q, n_test_chars))4. Запускаем все парсеры разом

Т.к. каждый парсер реализован асинхронно, то, чтобы они работали все вместе, добавим их в один общий цикл событий (event_loop). Это сделано для экономии ресурсов.
Примечание: в обычном синхронном коде, когда процесс в исполняющем потоке доходит до места, где требуются внешние ресурсы, он блокирует исполнение, ожидая ответа. При асинхронной реализации программы исполняющий поток занимается другим процессом — за счет этого и увеличивается производительность.
Тут же стоит отметить, что очередь posted_q (класс deque() модуля collections в python) является потокобезопасной, т.е. можно спокойно добавлять в неё новости из разных парсеров.
import httpx
import asyncio
from collections import deque
from telegram_parser import telegram_parser
from rss_parser import rss_parser
from bcs_parser import bcs_parser
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Канал куда скидываем новости, например @habr_agg, сюда введи свой канал
channel_habr_agg = 'https://t.me/habr_agg'
# Очередь из уже опубликованных постов, чтобы их не дублировать
posted_q = deque(maxlen=40)
# 50 первых символов от текста новости - это ключ для проверки повторений
n_test_chars = 50
async def send_message_func(message):
'''Отправляет посты в канал'''
await client.send_message(entity=channel_habr_agg, message=message)
# Телеграм парсер
client = telegram_parser(send_message_func, loop)
httpx_client = httpx.AsyncClient()
# Добавляет в текущий event_loop rss парсер
loop.create_task(rss_parser(httpx_client, posted_q,
n_test_chars, send_message_func))
# Добавляет в текущий event_loop парсер сайта bcs-express.ru
loop.create_task(bcs_parser(httpx_client, posted_q,
n_test_chars, send_message_func))
# Запускает все парсеры
client.run_until_disconnected()Заключение
Репозиторий с исходным кодом (сложный парсер) — на GitHub.
Как было сказано выше, сложный агрегатор — это усложнённый вариант агрегатора простого. Основные отличия — фильтр для постов, увеличенное количество источников новостей, логирование, доработанная обработка ошибок, имитация запроса пользователя через браузер, докер контейнер и др.
Сложный агрегатор написан таким образом, чтобы быть максимально живучим, однако в принципе состоит из тех же модулей, что и простой.
Можно конечно запустить готовый агрегатор новостей где-то в облаке, но лично у меня он работает на очень слабеньком тонком клиенте, в котором всего 4 Gb оперативной памяти и двухъядерный процессор 1.2 GHz, этого железа хватает с большим запасом. Для меня это удобно, т.к. не приходится постоянно держать включенным настольный компьютер или ноутбук, плюс тонкий клиент совершенно бесшумный.
В целом его работой я доволен, это действительно очень удобно, когда едешь куда-то или отошёл по делам, можно легко следить за новостями через мобильный телефон.
Спасибо за внимание.
UPD
Телеграм канал, на котором можно оценить работу агрегатора @gazp_news, новости добавляются в будние дни в промежутке с 9:30 до 18:45-23:00, в это время у меня обычно включен тонкий клиент.
Комментарии (2)

DimaFromMai Автор
13.10.2022 14:07UPD
Телеграм канал, на котором можно оценить работу агрегатора @gazp_news, новости добавляются в будние дни в промежутке с 9:30 до 18:45-23:00, в это время у меня обычно включен тонкий клиент.

raspberry_pi_soft
Спасибо! Полезная статья, прочитал с удовольствием !