Рассмотрим многопоточность как один из подходов, позволяющий быстрее решать задачи, связанные с вводом-выводом, и на его основе напишем парсер.

Задача сбора данных из открытых источников возникает довольно часто, а основным фактором, влияющим на скорость её решения, выступает объём данных, порождающий большее количество обращений к источнику по сети, причём основное количество времени, затрачиваемого на работу с сетью, занимает ожидание ответа от источника.

Уменьшить время ожидания ответа при работе с сетью можно используя подходящие программные модели. Например, в случае многопоточного выполнения, подзадачи программы могут быть распределены между потоками.

Соберём достаточно большое количество стихов поэтов-классиков из открытого источника, оценим затраченное время и ускорим процесс сбора. Использовать будем Python, модули: bs4, json, os, queue, requests, time и threading.

Переходим по первой ссылке из поисковика:

После изучения внешнего вида страницы, можно сделать вывод о том, что сортировки по авторам нет и произведения одного автора могут встретиться как на первой, так и на последней страницах, учтём это.

Открываем инструменты разработчика, идём на вкладку «сеть», открываем страницу любого автора:

Нашли адрес, при отправке запроса на который, вернётся json с произведениями определённого параметрами запроса автора.

Импорты:

from bs4 import BeautifulSoup
import json
import os
import requests
import time

В рамках функции, выгружающей произведения по одному автору, пройдём по нескольким ссылкам, чтобы набрать cookie, изменим стандартные для модуля requests заголовки, будем циклично отправлять запросы и разбирать json-ответы, сохраняя их в файл:

def getForAuthor(sess, author):
    urls = ["https://www.google.ru/", "https://www.youtube.com/"]

    for i in urls:
        sess.get(url=i)

    headers = {
        "accept": "*/*",
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
    }

    result = {}
    page = 1
    while True:
        url = f"https://www.culture.ru/_next/data/i_YUtTpPEY7K2ap4pw3Y4/literature/poems/{author}.json?pathParams=author-sergei-esenin"
        params = {
            "pathParams": author,
            "page": page
        }

        while True:
            try:
                res = sess.get(
                    url=url,
                    params=params,
                    headers=headers
                )
                break
            except Exception as ex:
                print(ex)
        js = res.json()
        maxPage = js["pageProps"]["pagination"]["total"]
        for item in js["pageProps"]["poems"]:
            result[item["_id"]] = {
                "id": item["_id"],
                "title": item["title"],
                "text": item["text"]
            }

        if page == maxPage:
            break
        page += 1
        
    with open(f"./data/{author}.json", "w", encoding="utf-8") as file:
        file.write(json.dumps(result, ensure_ascii=False, indent=4))

Запустим:

if __name__ == "__main__":
    start = 1
    stop = 2
    sess = requests.Session()
    authors = []
    maxPage = int(getMaxPage(sess=sess))
    print(maxPage)
    start = time.time(

    for page in range(start, stop)):
        print(page)
        url = f"https://www.culture.ru/literature/poems?page={page}"

        while True:
            try:
                res = sess.get(url=url)
                break
            except Exception as ex:
                print(ex)

        soup = BeautifulSoup(res.text, "html.parser")

        athrs = soup.find_all(attrs={"class": "_6unAn"})
    
        for author in athrs:
            author = author.get("href").replace("/literature/poems/", "")
            os.system("cls")
            
            if author not in authors:
                authors.append(author)
            print(len(authors))
        
    for a in authors:
        os.system("cls")
        print(a)
        getForAuthor(sess = sess, author = a)
    
    print("Done for", time.time() - start, "seconds", len(authors))

Результаты сохранились:

Заглянем в один из файлов, чтобы убедиться, что всё записано так, как ожидалось:

Как можно было заметить ранее, результат выгружен только по шестнадцати авторам с первой страницы сайта, и это заняло около семи минут.

Почему так долго? И что происходит при отправке запроса дольше всего? Ожидание.

Для того, чтобы повысить производительность воспользуемся модулем threading: с его помощью запустим потоки, выгружающие произведения по одному автору в одном процессе. При переходе одного потока в состояние ожидания, будет выполняться другой, таким образом время ожидания уменьшится. Для обмена данными с потоком будет использоваться очередь из модуля queue.

Частично перепишем предыдущий скрипт.

Импорты:

from bs4 import BeautifulSoup
import json
import os
from queue import Queue
import requests
import time
import threading

Функция, которая будет работать в потоке:

def getForAuthor(task):
    sess = requests.Session()
    urls = ["https://www.google.ru/", "https://www.youtube.com/"]
    for i in urls:
        sess.get(url=i)

    headers = {
        "accept": "*/*",
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
    }

    result = {}
    page = 1
    if not task.empty():
        author = task.get()
        page = 1
        while True:
            url = f"https://www.culture.ru/_next/data/i_YUtTpPEY7K2ap4pw3Y4/literature/poems/{author}.json?pathParams=author-sergei-esenin"
            params = {
                "pathParams": author,
                "page": page
            }

            while True:
                try:
                    res = sess.get(
                        url=url,
                        params=params,
                        headers=headers
                    )
                    break
                except Exception as ex:
                    print(ex)
            js = res.json()
            maxPage = js["pageProps"]["pagination"]["total"]
            for item in js["pageProps"]["poems"]:
                result[item["_id"]] = {
                    "id": item["_id"],
                    "title": item["title"],
                    "text": item["text"]
                }

            if page == maxPage:
                break
            page += 1
            
        with open(f"./data/{author}.json", "w", encoding="utf-8") as file:
            file.write(json.dumps(result, ensure_ascii=False, indent=4))

Запустим:

if __name__ == "__main__":
    start = 1
    stop = 2
    sess = requests.Session()
    authors = []
    # создание экземпляра класса очереди
    task = Queue()
    start = time.time()

    for page in range(start, stop):
        print(page)
        url = f"https://www.culture.ru/literature/poems?page={page}"

        while True:
            try:
                res = sess.get(url=url)
                break
            except Exception as ex:
                print(ex)

        soup = BeautifulSoup(res.text, "html.parser")

        athrs = soup.find_all(attrs={"class": "_6unAn"})
    
        for author in athrs:
            author = author.get("href").replace("/literature/poems/", "")
            os.system("cls")
            
            if author not in authors:
                authors.append(author)
    # заполнение очереди    
    for a in authors:
        task.put(a)
    # запуск потоков
    for _ in range(len(authors)):
        threading.Thread(target=getForAuthor, args=(task,)).start()
    # цикл с условием останова для родительского потока
    while True:
        if threading.active_count() == 1:
            break    
    print("Threading done for", time.time() - start, "seconds", len(authors))

Вывод по окончанию работы, одна страница, те же шестнадцать авторов:

По полученным результатам делаем вывод, что достали те же самые данные, но, при этом, сократили время исполнения почти в два раза. А можно ещё быстрее? Можно, если более детально продумать процессы сбора и обработки данных из очереди.

С помощью модуля threading можно повысить производительность не только при работе с сетью, но и в любых задачах, связанных с вводом-выводом.

Комментарии (16)


  1. hobogene
    14.06.2022 09:54

    Для того, чтобы повысить производительность воспользуемся модулем threading: с его помощью запустим потоки, выгружающие произведения по одному автору в одном процессе.

    Так в потоке или в процессе?


    1. zuko3d
      14.06.2022 23:00

      А мы под линуксом или под виндой? :)


      1. hobogene
        14.06.2022 23:32

        А для Python разве есть разница в данном случае? Я просто спрашиваю, без намеков :-)


        1. zuko3d
          15.06.2022 00:44

          Разница есть для понятия потока и процесса. В линуксе потоки являются процессами, в винде - это отдельные сущности.


          1. hobogene
            15.06.2022 00:46

            Да нет, это-то я в курсе. И, в силу возраста, даже историю вопроса немного помню. Просто вот поток в Python не совсем то же, что поток в операционке (любой).


    1. NewTechAudit Автор
      16.06.2022 11:56

      Здесь речь идёт о том, что все создаваемые потоки работают в рамках одного процесса – исполняемого файла


      1. hobogene
        16.06.2022 14:56

        Похоже, запятая пропущена. Понятно. Спасибо за разъяснение.


  1. Drakosh
    14.06.2022 09:57
    +3

    На текущий момент более актуален AsyncIO.

    https://stackoverflow.com/questions/27435284/multiprocessing-vs-multithreading-vs-asyncio-in-python-3


    1. MentalBlood
      14.06.2022 10:31

      И, для некоторых приложений, библиотека reactive


    1. NewTechAudit Автор
      16.06.2022 11:56

      Соглашусь, asyncio действительно актуален, threading рассматривается как один из подходов, позволяющий снизить время выполнения программ,  решающих задачи, связанные с вводом-выводом


  1. Yuribtr
    14.06.2022 11:19

    Ввиду наличия asyncio и GIL, в классическом CPython не вижу смысла использовать threading. Потому как с threading мы получаем небольшое торможение работы программы из за переключения потоков, а также получаем неудобства связанные с необходимостью синхронизации доступа к общим переменным.


    1. NewTechAudit Автор
      16.06.2022 11:56

      Здесь threading рассматривается как один из подходов, позволяющий снизить время выполнения программ,  решающих задачи, связанные с вводом-выводом


      1. Yuribtr
        16.06.2022 23:40

        Да, один из подходов, не спорю.
        Но использовать его новичкам не рекомендуется из-за того что легко получить "race condition". А для продвинутых такой подход не нужен, так как есть Asyncio и Multiprocessing. Итого применение threading имеет смысл в версиях Python без GIL.
        Мне кажется в статье нехватает такого вот обьяснения.


  1. kAIST
    14.06.2022 11:46

    А если у вас 100-500-1000 заданий, то создаваться столько же потоков?

    Тогда уж multiprocessing с Pool использовать, и wrapper для очередей из того же модуля.


    1. NewTechAudit Автор
      16.06.2022 11:56

      Количество потоков не обязательно равно количеству задач


      1. kAIST
        16.06.2022 13:20

        У вас в коде оно равно. Если будет больше авторов, то будет будет больше потоков. Почему бы сразу это не учитывать и учить делать правильно? Плюс ещё скорее всего ядро процессора будет загружено на 100% из за того что в главном потоке у вас while шпарит на всю катушку. Хотя бы какой то sleep поставьте чтобы ядро не загружалось бесполезной работой.