Разберу простую задачу, получение rss-ленты, и то, чем будет отличаться код, который просто получает ленту, от того, который собственно используется в производстве.
Надеюсь материал будет полезен начинающим программистам и покажет, как примерно должна осуществляться разработка с прицелом на получение результата применимого в проектах.
Всё же работает, почему не берут?
Начнем с конфликта: решение задачи получения rss-ленты выглядит очень просто, вот так:
import requests
import feedparser
response = requests.get('https://lenta.ru/rss')
lenta = feedparser.parse(response.text)
for items in lenta['entries']:
print(items["title"])
Для одноразового удовлетворения любопытства вполне достаточно, но что если нам нужно получать несколько рсс-лент в плотном режиме (например каждую минуту)?
Пожалуй главное отличие прода в том, что код запускается не вручную (в целях любопытства: “А что мы интересно получим?”), а для получения стабильного предсказуемого результата в больших количествах. В нашем случае, это мониторинг ленты рсс, т.е. нам нужно будет слать много запросов и получать много ответов.
Разберу самые необходимые элементы, которые нужно добавить, к этому коду, чтобы он мог стабильно работать, получая несколько фидов или даже несколько фидов одновременно).
Как будет работать код, какой результат нужен
Картинка: Программист, понимающий предназначение написанного им кода
Вопрос номер 1 где и как будет запускаться код. В данном случае, это будет или бесконечный цикл while true в виде сервиса на сервере или запуск по расписанию. В целом, оба подхода требуют одного: нам нужен стабильный перезапуск, чтобы если получим одну ошибку, вся система не падала. Но это несколько забегая вперёд. Сперва разберемся с самым простым.
Тут важно понимать, где и в каких условиях будет запускаться то, что вы пишите.
Проверка 200-ответа
Картинка: Проверка 200-ответа
Итак requests.get(url), что не так, и что нужно добавить.
Начнем с того, что requests.get довольно капризная история, и если вы планируете посылать регулярные запросы к серверу, хорошо бы обрабатывать ответы с кодом отличным от 200.
Добавляем проверку, строчку будет лучше поместить в функцию.
def get_response(url):
response = requests.get(url)
if response.status_code == 200:
return response
else:
return False
Если ответ 200, получаем ответ и двигаемся дальше, если нет, тоже двигаемся дальше с небольшим но.
Логгирование, контроль исполнения и отладка
Картинка: Логгирование, контроль исполнения и отладка
Если задуматься о ситуации когда сервер вернет не 200-й ответ, тут думаю, должно возникнуть интуитивное желание записать происходящее, с тем чтобы:
Отследить, случаи когда ответ не получен
Понять почему это происходит.
Лучше вынести эту проверку в отдельную функцию:
def response_res(response):
status = True
if response.status_code != 200:
status = False
return {'status':status, 'code': response.status_code, 'reason':response.reason}
Функция возвращает словарь, в котором есть код (нужен для проверки следующего шага) и причину (нужна для отладки).
Благодаря такой проверке мы можем получить что-то вроде:
HTTPSConnectionPool(host='riafan.ru', port=443): Max retries exceeded with url: /feed (Caused by ProxyError('Cannot connect to proxy.', timeout('_ssl.c:1114: The handshake operation timed out')))
И думать что с этим делать.
Немного маскировки
Картинка: Немного маскировки
Как можно понять из приведенной выше ошибки, автоматический сбор данных не очень приветствуется, даже в таком вроде бы легальном поле как получение рсс-ленты. Поэтому для устойчивого функционирования кода, эту ситуацию тоже нужно учитывать.
Как хорошо известно более опытным товарищам, голый запрос, скорее всего или словит капчу на второй-третий раз или просто будет заблокирован сервером, хорошо бы добавить маскировку и какой-то заголовок. Немного усовершенствуем функцию:
import fake_useragent
import logging
def get_response(url):
s = requests.Session()
user = fake_useragent.UserAgent().random
header = {"user-agent": user}
response = s.get(url, headers=header)
logging.info(f"{url}, {res.status_code}, {s.cookies}")
return response
Этого конечно же мало, как минимум не хорошо слать запросы с пустыми cookies, referer и так далее, но в этой статье в такие подробности углубляться не буду, главное, чтобы направление дальнейших исследований узких мест было понятным.
Если вообще не сработает
Картинка: try-except декоратор
Идём дальше, капризность requests не ограничивается ответами сервера, очень часто она может нам вернуть неприятность в виде ошибки. Если мы будем работать с запросами нескольких лент, ошибка в одной убьёт весь сбор.
Добавляем, так любимый многими try-except, получаем ещё одну функцию:
def try_request():
try:
return get_response(url)
except:
return False
Тут видно, что в случае успеха, мы получаем наш response, а вот с исключением возникает, вопрос, как его правильно обработать.
Чтобы не писать дополнительных функций, используем в исключении объект Response() с ответом отличным от 200, и передадим с ним ошибку. Примерно так:
from requests.models import Response
def try_request():
try:
return get_response(url)
except Exception as ex:
response = Response()
response.reason = ex
response.status_code = 444
return response
Внесём немного разнообразия в процесс, и сделаем функцию try_request() в виде декоратора.
import sys
def try_request(req):
def wrap(url):
status = False
try:
response = req(url)
error = 0
status = True
except Exception as ex:
response = Response()
response.reason = ex
response.status_code = 444
error = sys.exc_info()[1]
return {"status": status, "response": response, "error": error}
return wrap
@try_request
def get_response(url):
s = requests.Session()
user = fake_useragent.UserAgent().random
header = {"user-agent": user}
response = s.get(url, headers=header)
logging.info(f"{url}, {response.status_code}, {s.cookies}")
return response
Опять используем словарь, на случай получения ошибок и их отладки. Если функция не сработает вернётся сгенерированный нами response, который отловит функция по неверному коду ответа.
Всё готово к развертыванию
Картинка: А теперь запустим многопоточный режим
Узкие места учтены, можно пробовать развернуть скрипт в рабочем варианте. При этом, можно использовать многопоточный режим, и отдельные возможные проблемы не скажутся на общем выполнении. Для одновременных запросов многопоточность вообще хороша, так как экономит много времени на исполнение.
from multiprocessing.pool import ThreadPool
def pool_data(rss_list):
pool = ThreadPool(len(rss_list))
try:
feeds = pool.map(harvest_all, rss_list)
pool.close()
return feeds
except Exception as ex:
logging.info(f"многопоточность сломалась {ex}")
return []
Тут всё просто, при помощи ThreadPool создаём количество потоков, равное количеству лент, и всё одновременно отрабатываем.
Ошибок на этой функции ловить не приходилось, возможно try-except тут излишний, но вроде как есть не просит и особо не мешает.
Вроде всё готово..
Запускаем программу... и кладём сервер
Стабильно это работать не будет. Мы забыли указать timeout в s.get!
Если запустить программу в режиме планировщика (например каждые 30 секунд), может возникнуть ситуация, когда ожидается ответ сервера, и уходит новый запрос, потом ещё и ещё, и
out of memory killed process
Добавим таймаут:
response = s.get(url, headers=header, timeout=3)
Ответ 200 не гарантирует, что вы получили, что хотели, ещё одна проверка
И ещё нужно проверить, что в ответе сервера есть, то что вы хотите. Ответ сервера может быть и с кодом 200, но внутри может не оказаться содержания которого вы ждёте. Например капча может быть вполне с 200 кодом или страница блокировки ваших бесконечных запросов без заголовка.
В нашем случае, мы получаем словарь с определенными полями, поэтому можно сделать универсальную проверку.
def check_feed(response):
status = False
lenta = feedparser.parse(response.text)
if lenta['entries']:
status = True
return {'status':status, 'lenta': lenta['entries']}
Как выглядит строчка requests.get(url) в готовом проекте
Итоговый вид такой: у нас список адресов рсс-лент, который мы передаём в программу, запросы по всем адресам отправляются одновременно, после чего проверяется:
Что запрос вообще отработал без ошибки
Полученный ответ сервера (с фиксацией причин проблем)
Что в ответе есть нужное содержание.
При этом, если какая-то ссылка по какой-то причине не отработает, мы получим код и значение, которое позволит разобраться в причинах неполадок, без прекращения работы скрипта.
В финале, вот так строчка response = requests.get(url) выглядит в рабочем проекте:
Как выглядит итоговый код
import requests
import feedparser
import sys
from requests.models import Response
import fake_useragent
from multiprocessing.pool import ThreadPool
import logging
# проверка статуса ответа
def response_res(response):
status = True
if response.status_code != 200:
status = False
return {"status": status, "code": response.status_code, "reason": response.reason}
# try-except декоратор
def try_request(req):
def wrap(url):
status = False
try:
response = req(url)
error = 0
status = True
except Exception as ex:
response = Response()
response.reason = ex
response.status_code = 444
error = sys.exc_info()[1]
return {"status": status, "response": response, "error": error}
return wrap
# основная функция запроса
@try_request
def get_response(url):
s = requests.Session()
user = fake_useragent.UserAgent().random
header = {"user-agent": user}
response = s.get(url, headers=header)
logging.info(f"{url}, {response.status_code}, {s.cookies}")
return response
# проверка содержания ответа
def check_feed(response):
status = False
lenta = feedparser.parse(response.text)
if lenta["entries"]:
status = True
return {"status": status, "lenta": lenta["entries"]}
# сборная всех проверок и запроса
def harvest_all(url):
response = get_response(url)
response_stat = response_res(response["response"])
feed_res = check_feed(response["response"])
res_dict = {
"feed": url,
"response": response,
"response_status": response_stat,
"feed_cheker": feed_res,
}
return res_dict
# многопоточная функция
def pool_data(rss_list):
pool = ThreadPool(len(rss_list))
try:
feeds = pool.map(harvest_all, rss_list)
pool.close()
return feeds
except Exception as ex:
logging.exception(f"многопоточность сломалась")
return []
def main():
rss_list = [
"https://feed1.xml",
"https://feed2.xml",
"https://feed3.xml",
]
feeds = pool_data(rss_list)
for item in feeds:
if item["feed_cheker"]["status"]:
lenta = feedparser.parse(item["response"]["response"].text)
for titles in lenta["entries"]:
print(titles["title"])
if __name__ == "__main__":
main()
Чтобы код использовался в рабочем проекте, его желательно продумывать схожим образом, на все возможные случаи, которые очевидные и не совсем.
PS
Как ещё больше углубиться в проблему и довести до ума код очень рекомендую к прочтению комментарий ув. andreymal
Комментарии (12)
Eefrit
27.06.2023 10:58Кажется, в самом первом примере проверки 200-го ответа вы перепутали равно с неравно :)
andreymal
27.06.2023 10:58+12хорошо бы обрабатывать ответы с кодом отличным от 200. Добавляем проверку, строчку будет лучше поместить в функцию.
Во-первых, зачем ради одной самоочевидной строки заводить целую функцию? Во-вторых, в requests и других подобных библиотеках уже есть встроенная функция
raise_for_status
, которая выбрасывает исключение при статусах 4xx и 5xxreturn False
Пхпшники покусали? Во-первых, выбрасывать исключения при ошибках в питоне абсолютно нормально, та же
raise_for_status
так делает. Во-вторых, если есть аллергия на исключения, то можно хотя бы None вернуть, это всё ещё менее странно чем FalseЛоггирование, контроль исполнения и отладка
Всю эту информацию можно достать напрямую из объекта response или из возникшего исключения, функция
response_res
в показанном здесь виде не имеет никакого практического смысла. Кроме того,response_res
ломает статическую типизацию, потому что заменяет объекты с аннотациями типов на нетипизированный словарь — для производства и будущей поддержки кода это ОЧЕНЬ плохоавтоматический сбор данных не очень приветствуется, даже в таком вроде бы легальном поле как получение рсс-ленты.
Бред, RSS по сути своей предназначен именно для автоматизированного сбора данных
словит капчу на второй-третий раз
Капча на RSS — опять бред
fake_useragent.UserAgent().random
И здесь сразу два подозрительных момента. Во-первых, клиент, меняющий свой юзер-агент при каждом запросе — это в принципе странно, а во-вторых, браузеры не умеют читать RSS (за редким исключением в виде браузерных расширений и прочей мелочи) и значит браузерного юзер-агента здесь быть в принципе не должно
не хорошо слать запросы с пустыми cookies, referer и так далее
Ничего из перечисленного быть в принципе не должно, это же RSS
except:
… и получаем невозможность прервать работу программы из-за перехваченных KeyboardInterrupt и SystemExit. К счастью, в следующем примере кода это уже исправлено, хоть и без пояснений
а вот с исключением возникает, вопрос, как его правильно обработать.
Точно не подменой на фейковый Response. Опять аллергия на исключения?
Для одновременных запросов многопоточность вообще хороша
Для одновременных запросов многопоточность вообще не подходит, потому что наплодит кучу жрущих ресурсы потоков, которые при этом будут простаивать без дела в ожидании ответа от сервера. Для io-bound задач специально изобрели асинхронность, которая замечательно работает в одном потоке
logging.info(f"многопоточность сломалась {ex}")
info для критических ошибок, скрытие информации о стеке, скрытие информации о типе ошибки, ужас… Есть же специальный
logging.exception
для такогоЕсли запустить программу в режиме планировщика (например каждые 30 секунд)
Нормальный планировщик должен дожидаться, когда предыдущий запуск завершится
Как выглядит итоговый код
feedparser.parse
зачем-то вызывается два раза, пустая трата процессора. Причём во второй раз не перехватываются возможные исключения (а вдруг внутри feedparser баги есть)Ещё feedparser сам по себе плох тем, что возвращает результат парсинга в виде нетипизированного словаря, но так как я не в курсе, есть ли достойные альтернативы, то ладно, пусть пока будет
Итого: меняем многопоточность на асинхронность, избавляемся от аллергии на исключения, возвращаем нормальные объекты вместо нетипизированного словаря, выкидываем бесполезный юзер-агент — и получаем гораздо более адекватный и при этом более компактный код:
Как на самом деле выглядит итоговый кодimport asyncio import logging import aiohttp import feedparser async def harvest_all(url: str) -> None: try: timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as response: response.raise_for_status() raw_feed = await response.text() except Exception: logging.exception("Failed to get feed %s", url) return try: feed = feedparser.parse(raw_feed) process_feed(url, feed) except Exception: logging.exception("Failed to process feed %s", url) def process_feed(url: str, feed: feedparser.FeedParserDict) -> None: if not feed["entries"]: logging.info("Feed %s has no entries", url) return for entry in feed["entries"]: logging.info("%s", entry["title"]) async def main() -> None: logging.basicConfig(level=logging.INFO) rss_list = [ "https://lenta.ru/rss", "https://habr.com/ru/rss/all/", ] while True: tasks = {asyncio.create_task(harvest_all(url)) for url in rss_list} await asyncio.wait(tasks) await asyncio.sleep(60) if __name__ == "__main__": asyncio.run(main())
Story-teller Автор
27.06.2023 10:58+3Спасибо за квалифицированный комментарий, очень полезные замечания и доработанный код!
9982th
27.06.2023 10:58+1Все-таки у оригинального кода есть одно преимущество: его легче понять и доработать. Обратите внимание, как много вашего кода было переиспользовано в предложенном варианте (одно название функции, если я не ошибаюсь?). Часто попытки разобраться как исправить/доработать такой вот продакшн-код занимают больше времени и сил, чем используя типовые паттерны добавить простую обработку ошибок к ничем незамутненному happy path.
Story-teller Автор
27.06.2023 10:58Критика конструктивная, особенно по части нетипизированных словарей) и ряд подходов тоже интересные, определенно стоить рассмотреть внедрение. В тоже время предложенный в статье вариант (с учётом отмеченных недостатков) вполне рабочий. По части решения многопоточности, изучу подробнее, тут аргументы не самые убедительные приведены, но с предметом подробно не знаком. В любом случае, пользы от материала получилось много больше чем ожидалось)
Bessnov
27.06.2023 10:58+1"Чтобы код использовался в рабочем проекте..." - наверно имеет смысл заранее составить чёткое ТЗ что от этого кода ожидается и какие ситуации нужно обрабатывать. Тогда программисту не придётся по ходу дела что-то выдумывать.
Story-teller Автор
27.06.2023 10:58В начале материала задача ставится, и дальше идёт объяснение подхода к её решению
нам нужно получать несколько рсс-лент в плотном режиме (например каждую минуту)
dx-77
27.06.2023 10:58Ещё можно так писать
try:
response = requests.get()
if response.ok:
...
except requests.RequestException as err:
...
atshaman
27.06.2023 10:58Перечень фидов в коде, а не в отдельном файле, отсутствие самоминимального cli - такой хоккей нам не нужен...
Story-teller Автор
27.06.2023 10:58а какой cli может быть для приведенного случая? (фиды в коде, а не в отдельном файле, для наглядности)
andreymal
Как на самом деле выглядит итоговый код