Разберу простую задачу, получение rss-ленты, и то, чем будет отличаться код, который просто получает ленту, от того, который собственно используется в производстве. 

Надеюсь материал будет полезен начинающим программистам и покажет, как примерно должна осуществляться разработка с прицелом на получение результата применимого в проектах. 

Всё же работает, почему не берут?

Получение ответа сервера по гиперссылке_Kandinsky 2.1
Получение ответа сервера по гиперссылке_Kandinsky 2.1

Начнем с конфликта: решение задачи получения rss-ленты выглядит очень просто, вот так:

import requests
import feedparser

response = requests.get('https://lenta.ru/rss')
lenta = feedparser.parse(response.text)

for items in lenta['entries']:
    print(items["title"])

Для одноразового удовлетворения любопытства вполне достаточно, но что если нам нужно получать несколько рсс-лент в плотном режиме (например каждую минуту)?

Пожалуй главное отличие прода в том, что код запускается не вручную (в целях любопытства: “А что мы интересно получим?”), а для получения стабильного предсказуемого результата в больших количествах. В нашем случае, это мониторинг ленты рсс, т.е. нам нужно будет слать много запросов и получать много ответов. 

Разберу самые необходимые элементы, которые нужно добавить, к этому коду, чтобы он мог стабильно работать, получая несколько фидов или даже несколько фидов одновременно).

Как будет работать код, какой результат нужен

Картинка: Программист, понимающий предназначение написанного им кода

Программист, понимающий предназначение написанного им кода_Kandinsky 2.1
Программист, понимающий предназначение написанного им кода_Kandinsky 2.1

Вопрос номер 1 где и как будет запускаться код. В данном случае, это будет или бесконечный цикл while true в виде сервиса на сервере или запуск по расписанию. В целом, оба подхода требуют одного: нам нужен стабильный перезапуск, чтобы если получим одну ошибку, вся система не падала. Но это несколько забегая вперёд. Сперва разберемся с самым простым.

Тут важно понимать, где и в каких условиях будет запускаться то, что вы пишите.

Проверка 200-ответа

Картинка: Проверка 200-ответа
Проверка 200-ответа_Kandinsky 2.1
Проверка 200-ответа_Kandinsky 2.1

Итак requests.get(url), что не так, и что нужно добавить. 

Начнем с того, что requests.get довольно капризная история, и если вы планируете посылать регулярные запросы к серверу, хорошо бы обрабатывать ответы с кодом отличным от 200. 

Добавляем проверку, строчку будет лучше поместить в функцию.

def get_response(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response
    else:
        return False

Если ответ 200, получаем ответ и двигаемся дальше, если нет, тоже двигаемся дальше с небольшим но.

Логгирование, контроль исполнения и отладка

Картинка: Логгирование, контроль исполнения и отладка
Логгирование, контроль исполнения и отладка_Kandinsky 2.1
Логгирование, контроль исполнения и отладка_Kandinsky 2.1

Если задуматься о ситуации когда сервер вернет не 200-й ответ, тут думаю, должно возникнуть интуитивное желание записать происходящее, с тем чтобы:

  1. Отследить, случаи когда ответ не получен

  2. Понять почему это происходит.

Лучше вынести эту проверку в отдельную функцию:

def response_res(response):
    status = True
    if response.status_code != 200:
        status = False    
    return {'status':status, 'code': response.status_code, 'reason':response.reason}

Функция возвращает словарь, в котором есть код (нужен для проверки следующего шага) и причину (нужна для отладки).

Благодаря такой проверке мы можем получить что-то вроде:

HTTPSConnectionPool(host='riafan.ru', port=443): Max retries exceeded with url: /feed (Caused by ProxyError('Cannot connect to proxy.', timeout('_ssl.c:1114: The handshake operation timed out')))

И думать что с этим делать.

Немного маскировки

Картинка: Немного маскировки
Немного маскировки_Kandinsky 2.1
Немного маскировки_Kandinsky 2.1

Как можно понять из приведенной выше ошибки, автоматический сбор данных не очень приветствуется, даже в таком вроде бы легальном поле как получение рсс-ленты. Поэтому для устойчивого функционирования кода, эту ситуацию тоже нужно учитывать.

Как хорошо известно более опытным товарищам, голый запрос, скорее всего или словит капчу на второй-третий раз или просто будет заблокирован сервером, хорошо бы добавить маскировку и какой-то заголовок. Немного усовершенствуем функцию:

import fake_useragent
import logging

def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {res.status_code}, {s.cookies}")
    return response

Этого конечно же мало, как минимум не хорошо слать запросы с пустыми cookies, referer и так далее, но в этой статье в такие подробности углубляться не буду, главное, чтобы направление дальнейших исследований узких мест было понятным.

Если вообще не сработает

Картинка: try-except декоратор
try-except декоратор_Kandinsky 2.1
try-except декоратор_Kandinsky 2.1

Идём дальше, капризность requests не ограничивается ответами сервера, очень часто она может нам вернуть неприятность в виде ошибки. Если мы будем работать с запросами нескольких лент, ошибка в одной убьёт весь сбор. 

Добавляем, так любимый многими try-except, получаем ещё одну функцию:

def try_request():
    try:
        return get_response(url)
    except:
        return False

Тут видно, что в случае успеха, мы получаем наш response, а вот с исключением возникает, вопрос, как его правильно обработать.

Чтобы не писать дополнительных функций, используем в исключении объект Response() с ответом отличным от 200, и передадим с ним ошибку. Примерно так:

from requests.models import Response

def try_request():
    try:
        return get_response(url)
    except Exception as ex:
        response = Response()
        response.reason = ex    
        response.status_code = 444
        return response

Внесём немного разнообразия в процесс, и сделаем функцию try_request() в виде декоратора.

import sys

def try_request(req):
    def wrap(url):
        status = False
        try:
            response = req(url)
            error = 0
            status = True
        except Exception as ex:
            response = Response()
            response.reason = ex
            response.status_code = 444
            error = sys.exc_info()[1]
        return {"status": status, "response": response, "error": error}

    return wrap


@try_request
def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random    
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {response.status_code}, {s.cookies}")
    return response

Опять используем словарь, на случай получения ошибок и их отладки. Если функция не сработает вернётся сгенерированный нами response, который отловит функция по неверному коду ответа.

Всё готово к развертыванию

Картинка: А теперь запустим многопоточный режим
А теперь запустим многопоточный режим_Kandinsky 2.1
А теперь запустим многопоточный режим_Kandinsky 2.1

Узкие места учтены, можно пробовать развернуть скрипт в рабочем варианте. При этом, можно использовать многопоточный режим, и отдельные возможные проблемы не скажутся на общем выполнении. Для одновременных запросов многопоточность вообще хороша, так как экономит много времени на исполнение.

from multiprocessing.pool import ThreadPool

def pool_data(rss_list):
    pool = ThreadPool(len(rss_list))
    try:
        feeds = pool.map(harvest_all, rss_list)
        pool.close()
        return feeds
    except Exception as ex:
        logging.info(f"многопоточность сломалась {ex}")
        return []

Тут всё просто, при помощи ThreadPool создаём количество потоков, равное количеству лент, и всё одновременно отрабатываем.

Ошибок на этой функции ловить не приходилось, возможно try-except тут излишний, но вроде как есть не просит и особо не мешает.

Вроде всё готово..

Запускаем программу... и кладём сервер

Стабильно это работать не будет. Мы забыли указать timeout в s.get!

Если запустить программу в режиме планировщика (например каждые 30 секунд), может возникнуть ситуация, когда ожидается ответ сервера, и уходит новый запрос, потом ещё и ещё, и

out of memory killed process

Добавим таймаут:

response = s.get(url, headers=header, timeout=3)

Ответ 200 не гарантирует, что вы получили, что хотели, ещё одна проверка

И ещё нужно проверить, что в ответе сервера есть, то что вы хотите. Ответ сервера может быть и с кодом 200, но внутри может не оказаться содержания которого вы ждёте. Например капча может быть вполне с 200 кодом или страница блокировки ваших бесконечных запросов без заголовка. 

В нашем случае, мы получаем словарь с определенными полями, поэтому можно сделать универсальную проверку. 

def check_feed(response):
    status = False
    lenta = feedparser.parse(response.text)     
    if lenta['entries']:
        status = True    
    return {'status':status, 'lenta': lenta['entries']} 

Как выглядит строчка requests.get(url) в готовом проекте

Строчка программного кода превращается в Лернейскую гидру_Kandinsky 2.1
Строчка программного кода превращается в Лернейскую гидру_Kandinsky 2.1

Итоговый вид такой: у нас список адресов рсс-лент, который мы передаём в программу, запросы по всем адресам отправляются одновременно, после чего проверяется:

  1. Что запрос вообще отработал без ошибки

  2. Полученный ответ сервера (с фиксацией причин проблем)

  3. Что в ответе есть нужное содержание.

При этом, если какая-то ссылка по какой-то причине не отработает, мы получим код и значение, которое позволит разобраться в причинах неполадок, без прекращения работы скрипта. 

В финале, вот так строчка response = requests.get(url) выглядит в рабочем проекте:

Как выглядит итоговый код
import requests
import feedparser
import sys
from requests.models import Response
import fake_useragent
from multiprocessing.pool import ThreadPool
import logging

# проверка статуса ответа
def response_res(response):
    status = True
    if response.status_code != 200:
        status = False
    return {"status": status, "code": response.status_code, "reason": response.reason}

# try-except декоратор
def try_request(req):
    def wrap(url):
        status = False
        try:
            response = req(url)
            error = 0
            status = True
        except Exception as ex:
            response = Response()
            response.reason = ex
            response.status_code = 444
            error = sys.exc_info()[1]
        return {"status": status, "response": response, "error": error}

    return wrap

# основная функция запроса
@try_request
def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random   
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {response.status_code}, {s.cookies}")
    return response

# проверка содержания ответа
def check_feed(response):
    status = False
    lenta = feedparser.parse(response.text)
    if lenta["entries"]:
        status = True
    return {"status": status, "lenta": lenta["entries"]}

# сборная всех проверок и запроса
def harvest_all(url):
    response = get_response(url)
    response_stat = response_res(response["response"])
    feed_res = check_feed(response["response"])
    res_dict = {
        "feed": url,
        "response": response,
        "response_status": response_stat,
        "feed_cheker": feed_res,
    }
    return res_dict

# многопоточная функция
def pool_data(rss_list):
    pool = ThreadPool(len(rss_list))
    try:
        feeds = pool.map(harvest_all, rss_list)
        pool.close()
        return feeds
    except Exception as ex:
        logging.exception(f"многопоточность сломалась")
        return []


def main():
    rss_list = [
        "https://feed1.xml",
        "https://feed2.xml",
        "https://feed3.xml",
    ]
    feeds = pool_data(rss_list)
    for item in feeds:
        if item["feed_cheker"]["status"]:
            lenta = feedparser.parse(item["response"]["response"].text)
            for titles in lenta["entries"]:
                print(titles["title"])


if __name__ == "__main__":
    main()

Чтобы код использовался в рабочем проекте, его желательно продумывать схожим образом, на все возможные случаи, которые очевидные и не совсем.

PS

Как ещё больше углубиться в проблему и довести до ума код очень рекомендую к прочтению комментарий ув. andreymal

Комментарии (12)


  1. andreymal
    27.06.2023 10:58
    +12

    Как на самом деле выглядит итоговый код

    import asyncio
    import logging
    
    import aiohttp
    import feedparser
    
    async def harvest_all(url: str) -> None:
        try:
            timeout = aiohttp.ClientTimeout(total=10)
            async with aiohttp.ClientSession(timeout=timeout) as session:
                async with session.get(url) as response:
                    response.raise_for_status()
                    raw_feed = await response.text()
        except Exception:
            logging.exception("Failed to get feed %s", url)
            return
    
        try:
            feed = feedparser.parse(raw_feed)
            process_feed(url, feed)
        except Exception:
            logging.exception("Failed to process feed %s", url)
    
    def process_feed(url: str, feed: feedparser.FeedParserDict) -> None:
        if not feed["entries"]:
            logging.info("Feed %s has no entries", url)
            return
    
        for entry in feed["entries"]:
            logging.info("%s", entry["title"])
    
    async def main() -> None:
        logging.basicConfig(level=logging.INFO)
    
        rss_list = [
            "https://lenta.ru/rss",
            "https://habr.com/ru/rss/all/",
        ]
    
        while True:
            tasks = {asyncio.create_task(harvest_all(url)) for url in rss_list}
            await asyncio.wait(tasks)
            await asyncio.sleep(60)
    
    if __name__ == "__main__":
        asyncio.run(main())


  1. Eefrit
    27.06.2023 10:58

    Кажется, в самом первом примере проверки 200-го ответа вы перепутали равно с неравно :)


    1. Story-teller Автор
      27.06.2023 10:58

      Точно перепутал, поправил, спасибо!


  1. andreymal
    27.06.2023 10:58
    +12

    хорошо бы обрабатывать ответы с кодом отличным от 200. Добавляем проверку, строчку будет лучше поместить в функцию.

    Во-первых, зачем ради одной самоочевидной строки заводить целую функцию? Во-вторых, в requests и других подобных библиотеках уже есть встроенная функция raise_for_status, которая выбрасывает исключение при статусах 4xx и 5xx


    return False

    Пхпшники покусали? Во-первых, выбрасывать исключения при ошибках в питоне абсолютно нормально, та же raise_for_status так делает. Во-вторых, если есть аллергия на исключения, то можно хотя бы None вернуть, это всё ещё менее странно чем False


    Логгирование, контроль исполнения и отладка

    Всю эту информацию можно достать напрямую из объекта response или из возникшего исключения, функция response_res в показанном здесь виде не имеет никакого практического смысла. Кроме того, response_res ломает статическую типизацию, потому что заменяет объекты с аннотациями типов на нетипизированный словарь — для производства и будущей поддержки кода это ОЧЕНЬ плохо


    автоматический сбор данных не очень приветствуется, даже в таком вроде бы легальном поле как получение рсс-ленты.

    Бред, RSS по сути своей предназначен именно для автоматизированного сбора данных


    словит капчу на второй-третий раз

    Капча на RSS — опять бред


    fake_useragent.UserAgent().random

    И здесь сразу два подозрительных момента. Во-первых, клиент, меняющий свой юзер-агент при каждом запросе — это в принципе странно, а во-вторых, браузеры не умеют читать RSS (за редким исключением в виде браузерных расширений и прочей мелочи) и значит браузерного юзер-агента здесь быть в принципе не должно


    не хорошо слать запросы с пустыми cookies, referer и так далее

    Ничего из перечисленного быть в принципе не должно, это же RSS


    except:

    … и получаем невозможность прервать работу программы из-за перехваченных KeyboardInterrupt и SystemExit. К счастью, в следующем примере кода это уже исправлено, хоть и без пояснений


    а вот с исключением возникает, вопрос, как его правильно обработать.

    Точно не подменой на фейковый Response. Опять аллергия на исключения?


    Для одновременных запросов многопоточность вообще хороша

    Для одновременных запросов многопоточность вообще не подходит, потому что наплодит кучу жрущих ресурсы потоков, которые при этом будут простаивать без дела в ожидании ответа от сервера. Для io-bound задач специально изобрели асинхронность, которая замечательно работает в одном потоке


    logging.info(f"многопоточность сломалась {ex}")

    info для критических ошибок, скрытие информации о стеке, скрытие информации о типе ошибки, ужас… Есть же специальный logging.exception для такого


    Если запустить программу в режиме планировщика (например каждые 30 секунд)

    Нормальный планировщик должен дожидаться, когда предыдущий запуск завершится


    Как выглядит итоговый код

    feedparser.parse зачем-то вызывается два раза, пустая трата процессора. Причём во второй раз не перехватываются возможные исключения (а вдруг внутри feedparser баги есть)


    Ещё feedparser сам по себе плох тем, что возвращает результат парсинга в виде нетипизированного словаря, но так как я не в курсе, есть ли достойные альтернативы, то ладно, пусть пока будет


    Итого: меняем многопоточность на асинхронность, избавляемся от аллергии на исключения, возвращаем нормальные объекты вместо нетипизированного словаря, выкидываем бесполезный юзер-агент — и получаем гораздо более адекватный и при этом более компактный код:


    Как на самом деле выглядит итоговый код
    import asyncio
    import logging
    
    import aiohttp
    import feedparser
    
    async def harvest_all(url: str) -> None:
        try:
            timeout = aiohttp.ClientTimeout(total=10)
            async with aiohttp.ClientSession(timeout=timeout) as session:
                async with session.get(url) as response:
                    response.raise_for_status()
                    raw_feed = await response.text()
        except Exception:
            logging.exception("Failed to get feed %s", url)
            return
    
        try:
            feed = feedparser.parse(raw_feed)
            process_feed(url, feed)
        except Exception:
            logging.exception("Failed to process feed %s", url)
    
    def process_feed(url: str, feed: feedparser.FeedParserDict) -> None:
        if not feed["entries"]:
            logging.info("Feed %s has no entries", url)
            return
    
        for entry in feed["entries"]:
            logging.info("%s", entry["title"])
    
    async def main() -> None:
        logging.basicConfig(level=logging.INFO)
    
        rss_list = [
            "https://lenta.ru/rss",
            "https://habr.com/ru/rss/all/",
        ]
    
        while True:
            tasks = {asyncio.create_task(harvest_all(url)) for url in rss_list}
            await asyncio.wait(tasks)
            await asyncio.sleep(60)
    
    if __name__ == "__main__":
        asyncio.run(main())


  1. Story-teller Автор
    27.06.2023 10:58
    +3

    Спасибо за квалифицированный комментарий, очень полезные замечания и доработанный код!


    1. 9982th
      27.06.2023 10:58
      +1

      Все-таки у оригинального кода есть одно преимущество: его легче понять и доработать. Обратите внимание, как много вашего кода было переиспользовано в предложенном варианте (одно название функции, если я не ошибаюсь?). Часто попытки разобраться как исправить/доработать такой вот продакшн-код занимают больше времени и сил, чем используя типовые паттерны добавить простую обработку ошибок к ничем незамутненному happy path.


      1. Story-teller Автор
        27.06.2023 10:58

        Критика конструктивная, особенно по части нетипизированных словарей) и ряд подходов тоже интересные, определенно стоить рассмотреть внедрение. В тоже время предложенный в статье вариант (с учётом отмеченных недостатков) вполне рабочий. По части решения многопоточности, изучу подробнее, тут аргументы не самые убедительные приведены, но с предметом подробно не знаком. В любом случае, пользы от материала получилось много больше чем ожидалось)


  1. klopp_spb
    27.06.2023 10:58

    Поюс неплохо было бы If-Modified-Since обрабатывать.


  1. Bessnov
    27.06.2023 10:58
    +1

    "Чтобы код использовался в рабочем проекте..." - наверно имеет смысл заранее составить чёткое ТЗ что от этого кода ожидается и какие ситуации нужно обрабатывать. Тогда программисту не придётся по ходу дела что-то выдумывать.


    1. Story-teller Автор
      27.06.2023 10:58

      В начале материала задача ставится, и дальше идёт объяснение подхода к её решению

      нам нужно получать несколько рсс-лент в плотном режиме (например каждую минуту)


  1. dx-77
    27.06.2023 10:58

    Ещё можно так писать

    try:

    response = requests.get()

    if response.ok:

    ...

    except requests.RequestException as err:

    ...


  1. atshaman
    27.06.2023 10:58

    Перечень фидов в коде, а не в отдельном файле, отсутствие самоминимального cli - такой хоккей нам не нужен...


    1. Story-teller Автор
      27.06.2023 10:58

      а какой cli может быть для приведенного случая? (фиды в коде, а не в отдельном файле, для наглядности)