Рассмотрим многопоточность как один из подходов, позволяющий быстрее решать задачи, связанные с вводом-выводом, и на его основе напишем парсер.
Задача сбора данных из открытых источников возникает довольно часто, а основным фактором, влияющим на скорость её решения, выступает объём данных, порождающий большее количество обращений к источнику по сети, причём основное количество времени, затрачиваемого на работу с сетью, занимает ожидание ответа от источника.
Уменьшить время ожидания ответа при работе с сетью можно используя подходящие программные модели. Например, в случае многопоточного выполнения, подзадачи программы могут быть распределены между потоками.
Соберём достаточно большое количество стихов поэтов-классиков из открытого источника, оценим затраченное время и ускорим процесс сбора. Использовать будем Python, модули: bs4, json, os, queue, requests, time и threading.
Переходим по первой ссылке из поисковика:
После изучения внешнего вида страницы, можно сделать вывод о том, что сортировки по авторам нет и произведения одного автора могут встретиться как на первой, так и на последней страницах, учтём это.
Открываем инструменты разработчика, идём на вкладку «сеть», открываем страницу любого автора:
Нашли адрес, при отправке запроса на который, вернётся json с произведениями определённого параметрами запроса автора.
Импорты:
from bs4 import BeautifulSoup
import json
import os
import requests
import time
В рамках функции, выгружающей произведения по одному автору, пройдём по нескольким ссылкам, чтобы набрать cookie, изменим стандартные для модуля requests заголовки, будем циклично отправлять запросы и разбирать json-ответы, сохраняя их в файл:
def getForAuthor(sess, author):
urls = ["https://www.google.ru/", "https://www.youtube.com/"]
for i in urls:
sess.get(url=i)
headers = {
"accept": "*/*",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
}
result = {}
page = 1
while True:
url = f"https://www.culture.ru/_next/data/i_YUtTpPEY7K2ap4pw3Y4/literature/poems/{author}.json?pathParams=author-sergei-esenin"
params = {
"pathParams": author,
"page": page
}
while True:
try:
res = sess.get(
url=url,
params=params,
headers=headers
)
break
except Exception as ex:
print(ex)
js = res.json()
maxPage = js["pageProps"]["pagination"]["total"]
for item in js["pageProps"]["poems"]:
result[item["_id"]] = {
"id": item["_id"],
"title": item["title"],
"text": item["text"]
}
if page == maxPage:
break
page += 1
with open(f"./data/{author}.json", "w", encoding="utf-8") as file:
file.write(json.dumps(result, ensure_ascii=False, indent=4))
Запустим:
if __name__ == "__main__":
start = 1
stop = 2
sess = requests.Session()
authors = []
maxPage = int(getMaxPage(sess=sess))
print(maxPage)
start = time.time(
for page in range(start, stop)):
print(page)
url = f"https://www.culture.ru/literature/poems?page={page}"
while True:
try:
res = sess.get(url=url)
break
except Exception as ex:
print(ex)
soup = BeautifulSoup(res.text, "html.parser")
athrs = soup.find_all(attrs={"class": "_6unAn"})
for author in athrs:
author = author.get("href").replace("/literature/poems/", "")
os.system("cls")
if author not in authors:
authors.append(author)
print(len(authors))
for a in authors:
os.system("cls")
print(a)
getForAuthor(sess = sess, author = a)
print("Done for", time.time() - start, "seconds", len(authors))
Результаты сохранились:
Заглянем в один из файлов, чтобы убедиться, что всё записано так, как ожидалось:
Как можно было заметить ранее, результат выгружен только по шестнадцати авторам с первой страницы сайта, и это заняло около семи минут.
Почему так долго? И что происходит при отправке запроса дольше всего? Ожидание.
Для того, чтобы повысить производительность воспользуемся модулем threading: с его помощью запустим потоки, выгружающие произведения по одному автору в одном процессе. При переходе одного потока в состояние ожидания, будет выполняться другой, таким образом время ожидания уменьшится. Для обмена данными с потоком будет использоваться очередь из модуля queue.
Частично перепишем предыдущий скрипт.
Импорты:
from bs4 import BeautifulSoup
import json
import os
from queue import Queue
import requests
import time
import threading
Функция, которая будет работать в потоке:
def getForAuthor(task):
sess = requests.Session()
urls = ["https://www.google.ru/", "https://www.youtube.com/"]
for i in urls:
sess.get(url=i)
headers = {
"accept": "*/*",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
}
result = {}
page = 1
if not task.empty():
author = task.get()
page = 1
while True:
url = f"https://www.culture.ru/_next/data/i_YUtTpPEY7K2ap4pw3Y4/literature/poems/{author}.json?pathParams=author-sergei-esenin"
params = {
"pathParams": author,
"page": page
}
while True:
try:
res = sess.get(
url=url,
params=params,
headers=headers
)
break
except Exception as ex:
print(ex)
js = res.json()
maxPage = js["pageProps"]["pagination"]["total"]
for item in js["pageProps"]["poems"]:
result[item["_id"]] = {
"id": item["_id"],
"title": item["title"],
"text": item["text"]
}
if page == maxPage:
break
page += 1
with open(f"./data/{author}.json", "w", encoding="utf-8") as file:
file.write(json.dumps(result, ensure_ascii=False, indent=4))
Запустим:
if __name__ == "__main__":
start = 1
stop = 2
sess = requests.Session()
authors = []
# создание экземпляра класса очереди
task = Queue()
start = time.time()
for page in range(start, stop):
print(page)
url = f"https://www.culture.ru/literature/poems?page={page}"
while True:
try:
res = sess.get(url=url)
break
except Exception as ex:
print(ex)
soup = BeautifulSoup(res.text, "html.parser")
athrs = soup.find_all(attrs={"class": "_6unAn"})
for author in athrs:
author = author.get("href").replace("/literature/poems/", "")
os.system("cls")
if author not in authors:
authors.append(author)
# заполнение очереди
for a in authors:
task.put(a)
# запуск потоков
for _ in range(len(authors)):
threading.Thread(target=getForAuthor, args=(task,)).start()
# цикл с условием останова для родительского потока
while True:
if threading.active_count() == 1:
break
print("Threading done for", time.time() - start, "seconds", len(authors))
Вывод по окончанию работы, одна страница, те же шестнадцать авторов:
По полученным результатам делаем вывод, что достали те же самые данные, но, при этом, сократили время исполнения почти в два раза. А можно ещё быстрее? Можно, если более детально продумать процессы сбора и обработки данных из очереди.
С помощью модуля threading можно повысить производительность не только при работе с сетью, но и в любых задачах, связанных с вводом-выводом.
Комментарии (16)
Drakosh
14.06.2022 09:57+3На текущий момент более актуален AsyncIO.
NewTechAudit Автор
16.06.2022 11:56Соглашусь, asyncio действительно актуален, threading рассматривается как один из подходов, позволяющий снизить время выполнения программ, решающих задачи, связанные с вводом-выводом
Yuribtr
14.06.2022 11:19Ввиду наличия asyncio и GIL, в классическом CPython не вижу смысла использовать threading. Потому как с threading мы получаем небольшое торможение работы программы из за переключения потоков, а также получаем неудобства связанные с необходимостью синхронизации доступа к общим переменным.
NewTechAudit Автор
16.06.2022 11:56Здесь threading рассматривается как один из подходов, позволяющий снизить время выполнения программ, решающих задачи, связанные с вводом-выводом
Yuribtr
16.06.2022 23:40Да, один из подходов, не спорю.
Но использовать его новичкам не рекомендуется из-за того что легко получить "race condition". А для продвинутых такой подход не нужен, так как есть Asyncio и Multiprocessing. Итого применение threading имеет смысл в версиях Python без GIL.
Мне кажется в статье нехватает такого вот обьяснения.
kAIST
14.06.2022 11:46А если у вас 100-500-1000 заданий, то создаваться столько же потоков?
Тогда уж multiprocessing с Pool использовать, и wrapper для очередей из того же модуля.
NewTechAudit Автор
16.06.2022 11:56Количество потоков не обязательно равно количеству задач
kAIST
16.06.2022 13:20У вас в коде оно равно. Если будет больше авторов, то будет будет больше потоков. Почему бы сразу это не учитывать и учить делать правильно? Плюс ещё скорее всего ядро процессора будет загружено на 100% из за того что в главном потоке у вас while шпарит на всю катушку. Хотя бы какой то sleep поставьте чтобы ядро не загружалось бесполезной работой.
hobogene
Так в потоке или в процессе?
zuko3d
А мы под линуксом или под виндой? :)
hobogene
А для Python разве есть разница в данном случае? Я просто спрашиваю, без намеков :-)
zuko3d
Разница есть для понятия потока и процесса. В линуксе потоки являются процессами, в винде - это отдельные сущности.
hobogene
Да нет, это-то я в курсе. И, в силу возраста, даже историю вопроса немного помню. Просто вот поток в Python не совсем то же, что поток в операционке (любой).
NewTechAudit Автор
Здесь речь идёт о том, что все создаваемые потоки работают в рамках одного процесса – исполняемого файла
hobogene
Похоже, запятая пропущена. Понятно. Спасибо за разъяснение.