freepik


Здравствуйте дорогие хабровчане, в этом посте я хочу показать, как написать свой агрегатор новостей. Конечно, сразу становится очевидно, что это очередное изобретение велосипеда, однако анализируя существующие решения я всё время натыкался на камни преткновения. То они слишком медленно обновлялись, то не было нужных мне источников или часто бывало, что вообще ничего не работало без возможности починить. В итоге я написал своё решение.


Автор статьи приторговывает на бирже, и главной мотивацией было собрать все новости по интересующей теме в одном месте, чтобы не мониторить десяток различных источников вручную.


Текст под катом по большей части технический и будет, скорее всего, интересен читателям, которые сами торгуют на бирже и при этом в IT теме, либо тем, кто сам давно хотел написать агрегатор чего-нибудь.


Об агрегаторе новостей я размышлял уже давно. Во время торговли на бирже мне постоянно приходилось мониторить десяток авторитетных источников, особенно это напрягало, когда должна была выйти какая-нибудь новость, которая точно будет влиять на курс цены акций. В такие моменты было особенно сложно и обидно, когда подобную новость я пропускал. В общем, мне нужен был инструмент, с которым я мог бы оставаться в курсе всего.


Чтобы упростить понимание я написал два агрегатора, один — простой, его рассмотрю здесь. Код второго агрегатора, которым я пользуюсь сам, будет приложен в конце статьи. Простой агрегатор, в сущности, является более упрощённой версией сложного.


Основными источниками информации были телеграм каналы и новостные сайты. Для парсинга телеграма я выбрал telethon. Новости с сайтов можно забирать через RSS каналы с помощью feedparser. Однако, не на всех сайтах есть RSS, в этом случае буду парсить сайт напрямую используя scrapy. Полученные новости сливаются в отдельный телеграм канал с указанием источника.


Каждый парсер написан таким образом, чтобы его можно было запустить отдельно от остальных. Это значительно упрощает процесс добавления новых источников, их лучше проверять отдельно, чтобы убедиться в работоспособности. Например, feedparser может не прочитать RSS канал и тогда его придется парсить вручную.


Репозиторий с исходным кодом (простой парсер) — на GitHub.


1. Парсим телеграм канал



Чтобы telethon работал, необходимо для своего телеграм аккаунта создать переменные api_id и api_hash на сайте my.telegram.org и добавить эти параметры в скрипт. При первом запуске telethon сам создаёт файл с названием сессии (в нашем случае это gazp.session), его удалять не нужно, иначе придётся проходить аутентификацию ещё раз при следующем запуске.


Парсер сам по себе очень простой, считывает посты из канала @prime1 и печатает их в консоль, либо отсылает их, если в параметрах метода telegram_parser определена функция send_message_func.


telegram_parser.py


Показать...
from telethon import TelegramClient, events

def telegram_parser(send_message_func=None, loop=None):
    '''Телеграм парсер'''

    # Параметры из my.telegram.org
    api_id = <Твой api_id>
    api_hash = <Твой api_hash>

    # Канал источник новостей @prime1
    channel_source = 'https://t.me/prime1'

    # Сессия клиента telethon
    session = 'gazp'

    client = TelegramClient(session, api_id, api_hash, loop=loop)
    client.start()

    @client.on(events.NewMessage(chats=channel_source))
    async def handler(event):
        '''Забирает посты из телеграмм каналов и посылает их в наш канал'''

        if send_message_func is None:
            print(event.raw_text, '\n')
        else:
            await send_message_func(f'@prime1\n{event.raw_text}')

    return client

if __name__ == "__main__":

    client = telegram_parser()

    client.run_until_disconnected()

2. Парсим RSS



Примечание: RSS — крайне удобная вещь, в сущности это xml-файл, который мало весит, и в котором нет ничего лишнего. Такой файл сервер отдаёт без лишней нагрузки и клиент может легко его распарсить, имея при этом минимальные задержки для обновления.


Так как парсер забирает каждый раз N новостей, то каждый раз скачиваются старые новости, которые уже были напечатаны/отправлены. Для решения этой проблемы я ввёл очередь posted_q. Ясно, что вообще все сообщения сохранять не вариант, т.к. это потребует много памяти и, в конечном счёте, приведёт к ошибке MemoryError, когда она закончится. Кроме того в большом массиве долго проверять сообщения на повтор.


Таким образом, устаревшие сообщения нужно удалять, а новые сохранять, что и происходит в очереди. Слева в неё входят свежие новости, а справа удаляются старые, т.е. хранится всего N сообщений в моменте. В качестве ключа в очередь сохраняются первые 50 символов от текста новости, что также сделано для ускорения работы скрипта.


Парсер скачивает новости с RSS канала сайта www.rbc.ru.


rss_parser.py


Показать...
import httpx
import asyncio
from collections import deque
import feedparser

async def rss_parser(httpx_client, posted_q,
                     n_test_chars, send_message_func=None):
    '''Парсер rss ленты'''

    rss_link = 'https://rssexport.rbc.ru/rbcnews/news/20/full.rss'

    while True:
        try:
            response = await httpx_client.get(rss_link)
        except:
            await asyncio.sleep(10)
            continue

        feed = feedparser.parse(response.text)

        for entry in feed.entries[::-1]:
            summary = entry['summary']
            title = entry['title']

            news_text = f'{title}\n{summary}'

            head = news_text[:n_test_chars].strip()

            if head in posted_q:
                continue

            if send_message_func is None:
                print(news_text, '\n')
            else:
                await send_message_func(f'rbc.ru\n{news_text}')

            posted_q.appendleft(head)

        await asyncio.sleep(5)

if __name__ == "__main__":

    # Очередь из уже опубликованных постов, чтобы их не дублировать
    posted_q = deque(maxlen=20)

    # 50 первых символов от текста новости - это ключ для проверки повторений
    n_test_chars = 50

    httpx_client = httpx.AsyncClient()

    asyncio.run(rss_parser(httpx_client, posted_q, n_test_chars))

3. Парсим сайт напрямую



Кастомный парсер работает так же, как и RSS парсер, за тем лишь исключением, что используется scrapy вместо feedparser и скачивается вся страница, в которой кроме новостей ещё есть куча всего. Из-за этого приходится выставлять бо́льшую паузу между обращениями, ведь если слишком активно напрягать сервер, он может и забанить на какое-то время.


Подобный вид скриптов приходится писать, если у новостного сайта нет оперативно обновляемого телеграм и/или RSS канала. Парсер скачивает новости напрямую с сайта www.bcs-express.ru.


bcs_parser.py


Показать...
import httpx
import asyncio
from collections import deque
from scrapy.selector import Selector

async def bcs_parser(httpx_client, posted_q,
                            n_test_chars, send_message_func=None):
    '''Кастомный парсер сайта bcs-express.ru'''

    bcs_link = 'https://bcs-express.ru/category'

    while True:
        try:
            response = await httpx_client.get(bcs_link)
        except:
            await asyncio.sleep(20)
            continue

        selector = Selector(text=response.text)

        for row in selector.xpath('//div[@class="feed__list"]/div/div')[::-1]:

            raw_text = row.xpath('*//text()').extract()

            title = raw_text[3]
            summary = raw_text[5]

            news_text = f'{title}\n{summary}'

            head = news_text[:n_test_chars].strip()

            if head in posted_q:
                continue

            if send_message_func is None:
                print(news_text, '\n')
            else:
                await send_message_func(f'bcs-express.ru\n{news_text}')

            posted_q.appendleft(head)

        await asyncio.sleep(10)

if __name__ == "__main__":

    # Очередь из уже опубликованных постов, чтобы их не дублировать
    posted_q = deque(maxlen=20)

    # 50 первых символов от текста новости - это ключ для проверки повторений
    n_test_chars = 50

    httpx_client = httpx.AsyncClient()

    asyncio.run(bcs_parser(httpx_client, posted_q, n_test_chars))

4. Запускаем все парсеры разом



Т.к. каждый парсер реализован асинхронно, то, чтобы они работали все вместе, добавим их в один общий цикл событий (event_loop). Это сделано для экономии ресурсов.


Примечание: в обычном синхронном коде, когда процесс в исполняющем потоке доходит до места, где требуются внешние ресурсы, он блокирует исполнение, ожидая ответа. При асинхронной реализации программы исполняющий поток занимается другим процессом — за счет этого и увеличивается производительность.


Тут же стоит отметить, что очередь posted_q (класс deque() модуля collections в python) является потокобезопасной, т.е. можно спокойно добавлять в неё новости из разных парсеров.


main.py


Показать...
import httpx
import asyncio
from collections import deque

from telegram_parser import telegram_parser
from rss_parser import rss_parser
from bcs_parser import bcs_parser

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Канал куда скидываем новости, например @habr_agg, сюда введи свой канал
channel_habr_agg = 'https://t.me/habr_agg'

# Очередь из уже опубликованных постов, чтобы их не дублировать
posted_q = deque(maxlen=40)

# 50 первых символов от текста новости - это ключ для проверки повторений
n_test_chars = 50

async def send_message_func(message):
    '''Отправляет посты в канал'''
    await client.send_message(entity=channel_habr_agg, message=message)

# Телеграм парсер
client = telegram_parser(send_message_func, loop)

httpx_client = httpx.AsyncClient()

# Добавляет в текущий event_loop rss парсер
loop.create_task(rss_parser(httpx_client, posted_q,
                 n_test_chars, send_message_func))

# Добавляет в текущий event_loop парсер сайта bcs-express.ru
loop.create_task(bcs_parser(httpx_client, posted_q,
                                   n_test_chars, send_message_func))

# Запускает все парсеры
client.run_until_disconnected()

Заключение


Репозиторий с исходным кодом (сложный парсер) — на GitHub.


Как было сказано выше, сложный агрегатор — это усложнённый вариант агрегатора простого. Основные отличия — фильтр для постов, увеличенное количество источников новостей, логирование, доработанная обработка ошибок, имитация запроса пользователя через браузер, докер контейнер и др.


Сложный агрегатор написан таким образом, чтобы быть максимально живучим, однако в принципе состоит из тех же модулей, что и простой.


Можно конечно запустить готовый агрегатор новостей где-то в облаке, но лично у меня он работает на очень слабеньком тонком клиенте, в котором всего 4 Gb оперативной памяти и двухъядерный процессор 1.2 GHz, этого железа хватает с большим запасом. Для меня это удобно, т.к. не приходится постоянно держать включенным настольный компьютер или ноутбук, плюс тонкий клиент совершенно бесшумный.


В целом его работой я доволен, это действительно очень удобно, когда едешь куда-то или отошёл по делам, можно легко следить за новостями через мобильный телефон.


Спасибо за внимание.


UPD
Телеграм канал, на котором можно оценить работу агрегатора @gazp_news, новости добавляются в будние дни в промежутке с 9:30 до 18:45-23:00, в это время у меня обычно включен тонкий клиент.

Комментарии (2)


  1. raspberry_pi_soft
    11.10.2022 16:58
    +5

    Спасибо! Полезная статья, прочитал с удовольствием !


  1. DimaFromMai Автор
    13.10.2022 14:07

    UPD
    Телеграм канал, на котором можно оценить работу агрегатора @gazp_news, новости добавляются в будние дни в промежутке с 9:30 до 18:45-23:00, в это время у меня обычно включен тонкий клиент.