Предлагаю протестировать скрипт написанный на основе заготовки cloude 3.5 Sonnet с использованием специального промта. Пришлось почти полностью переписать, из-за товарищей в комментариях. Критика это хорошо когда обоснована.

Документацию доделаю потом. Внизу есть пример скрипта.

Системный промт через который нейронка дала заготовку для "Хака".

Скрытый текст

Ты - высококвалифицированный Python-программист и инженер данных в области машинного обучения (ML). Твоя задача - помогать пользователям в написании и отладки кода, анализе данных, оптимизации кода. Ты управляешь агентами экспертной системы, которые помогают тебе в решении различных задач.

Алгоритм логики ответа (Рабочий процесс - Цепочка размышлений)

  1. Уточнение: Задавай вопросы, уточняющие детали контекста. Попроси четко сформулировать задачу по технике SMART (Specific, Measurable, Achievable, Relevant, Time-bound).

  2. Определи тематическую область, выбери агента экспертной системы для решения задачи.

  3. Предварительная обработка запроса: Выдели ключевые элементы. Раздели один большой запрос на несколько подзапросов, задавай сам себе гипотетические вопросы о подзапросах и отвечай на них.

  4. Обработка данных: Проверяй свои гипотезы, аргументируй свою точку зрения, используй комбинацию аналитического и дедуктивного мышления. Определи уровни достоверности для каждого шага рассуждений. Отвечай какие препятствия и трудности могут возникнуть.

  5. Подготовка промежуточного ответа: Критически оцени свой ответ. На основе критики исправь ошибки, дополни код, обеспечь полноту и согласованность ответа. Только после этого переходи к выводу окончательного результата!

  6. Вывод окончательного результата: Необходимо предоставить детализированный, структурированный ответ с примерами, шаг за шагом. Если вы не знаешь ответа на задачу или вопрос, просто скажи что не знаешь, а не выдумывай.

  7. Обратная связь: Запрашивай обратную связь у пользователя после выполнения задачи и предлагай улучшения на основе его комментариев.

Приоритет на высокую скорость обработки данных скрипта и оптимизацию потребления памяти для обработки больших объёмов данных. Используй только те библиотеки которые позволяют добиться высокой скорости обработки данных скрипта при широкой функциональности не приводящие к неоправданно высокому потреблению памяти.

Техники оптимизации кода:

  • Замена обычных циклов for in где это возможно на оптимизированные генераторы List/generator Comprehension. Пример: collections.deque([item for item in strings]).

  • Для хранения неизменных списков используй numpy.array или tuple.

  • Используй map, filter, functools.reduce для оптимизации.

  • Используй модуль typing для типизации.

  • Используй декоратор functools.lru_cache для эффективной работы с памятью.

  • Используй комбинацию корутин asyncio, процессов-демонов Process(daemon=True) и очередей Queue из модуля multiprocessing для параллельного выполнения задач с защитой от глобальной блокировки интерпретатора.

  • Используй профилирование cProfile для выявления узких мест.

Ограничения:

  • Не используй классы. Применяй Inline-функции def.

  • Обеспечь совместимость кода python 3.10-3.12 включительно.

  • Не используй логирование.

Scrapy конечно хорошая библиотека, но у него много лишнего функционала, нету модульности и иногда очень странно парсит данные с сайта вырывая кусками.

Установка зависимостей:

pip install beautifulsoup4 lxml numpy psutil

Файл fast_soup.py:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
import asyncio
from functools import lru_cache
import psutil
from typing import AsyncIterator, Optional, Dict, Any
from collections import deque
import numpy as np
from bs4 import BeautifulSoup, Tag
import platform
import warnings
import _pickle as cPickle


def _get_optimal_workers() -> int:
    """Возвращает оптимальное количество процессов"""
    return min(cpu_count(), 61) if platform.system() == 'Windows' else cpu_count()


def _get_optimal_cache_size() -> int:
    """Определяет оптимальный размер кэша based on system memory"""
    available_memory = psutil.virtual_memory().available
    workers = _get_optimal_workers()
    base_cache_size = workers * 32

    if available_memory < 2 * 1024 * 1024 * 1024:  # < 2GB
        return min(base_cache_size, 64)
    elif available_memory < 4 * 1024 * 1024 * 1024:  # < 4GB
        return min(base_cache_size, 128)
    return min(base_cache_size, 256)


@lru_cache(maxsize=_get_optimal_cache_size())
def _cached_parse(html: str, parser: str) -> BeautifulSoup:
    """Кэширует создание объекта BeautifulSoup"""
    return BeautifulSoup(html, parser)


def _should_clear_cache() -> bool:
    """Проверяет необходимость очистки кэша"""
    cache_info = _cached_parse.cache_info()
    return (cache_info.currsize / cache_info.maxsize) > 0.8


def _parse_nested_tags(tag: Tag, nested_attrs: Dict[str, Any]) -> np.ndarray:
    """Рекурсивно ищет вложенные теги с заданными атрибутами"""
    results = deque(tag.find_all(nested_tag, attrs if attrs else {})
               for nested_tag, attrs in nested_attrs.items())
    return np.array(results, dtype=object)


def _parse_chunk(
        html: str,
        tag: Optional[str] = None,
        attrs: Optional[dict] = None,
        nested_attrs: Optional[Dict[str, Dict]] = None,
        parser: str = 'lxml'
) -> np.ndarray:
    """Парсит HTML в отдельном процессе с поддержкой вложенных тегов"""
    try:
        soup = _cached_parse(html, parser)
        results = deque()

        if tag:
            initial_tags = soup.find_all(tag, attrs if attrs else {})
            results.extend(initial_tags)

            if nested_attrs and initial_tags:
                nested_results = [_parse_nested_tags(initial_tag, nested_attrs)
                                  for initial_tag in initial_tags]
                results.extend([item for sublist in nested_results for item in sublist])
        else:
            results.append(soup)

        if _should_clear_cache():
            _cached_parse.cache_clear()

        return cPickle.dumps(np.array([r for r in results if r is not None], dtype=object))

    except Exception as e:
        warnings.warn(f"Ошибка парсинга: {e}")
        return cPickle.dumps(np.array([]))


async def parallel_parse(
        html: str,
        tag: Optional[str] = None,
        attrs: Optional[dict] = None,
        nested_attrs: Optional[Dict[str, Dict]] = None,
        max_workers: Optional[int] = None,
        timeout: float = 30.0,
        parser: str = 'lxml'
) -> AsyncIterator[Tag]:
    """
    Асинхронный парсер с поддержкой вложенных тегов
    """
    try:
        max_workers = max_workers or _get_optimal_workers()
        loop = asyncio.get_running_loop()

        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            task = loop.run_in_executor(
                executor,
                _parse_chunk,
                html,
                tag,
                attrs,
                nested_attrs,
                parser
            )

            try:
                serialized_result = await asyncio.wait_for(task, timeout)
                result = cPickle.loads(serialized_result)

                for item in result:
                    if isinstance(item, Tag):
                        yield item

            except asyncio.TimeoutError:
                warnings.warn("Превышен таймаут задачи")
                _cached_parse.cache_clear()
            except Exception as e:
                warnings.warn(f"Ошибка выполнения задачи: {e}")

    except Exception as e:
        warnings.warn(f"Общая ошибка парсинга: {e}")
    finally:
        _cached_parse.cache_clear()

Документация по асинхронному HTML парсеру

Содержание

  1. Общее описание

  2. Основные функции

  3. Примеры использования

  4. Оптимизация производительности

  5. Обработка ошибок

1. Общее описание

Данный модуль представляет собой высокопроизводительный асинхронный HTML парсер с поддержкой многопроцессорной обработки. Основные преимущества:

  • Асинхронное выполнение

  • Многопроцессорная обработка

  • Кэширование результатов

  • Оптимизированное использование памяти

  • Поддержка вложенных тегов

2. Основные функции

parallel_parse

Основная функция для асинхронного парсинга HTML:

async def parallel_parse(
    html: str,                    # HTML строка для парсинга
    tag: Optional[str] = None,    # Тег для поиска
    attrs: Optional[dict] = None, # Атрибуты тега
    nested_attrs: Optional[Dict[str, Dict]] = None, # Вложенные теги
    max_workers: Optional[int] = None,  # Количество процессов
    timeout: float = 30.0,        # Таймаут выполнения
    parser: str = 'lxml'          # Тип парсера
) -&gt; AsyncIterator[Tag]

3. Примеры использования

Базовый пример

import asyncio

async def main():
    html = """
    
        
            <div class="content">
                <p>Текст 1</p>
                <p>Текст 2</p>
            </div>
        
    
    """
    
    async for tag in parallel_parse(html, tag='p'):
        print(tag.text)

asyncio.run(main())

Поиск с атрибутами

async def find_with_attrs():
    html = """
    <div>
        <span class="price">100</span>
        <span class="price">200</span>
        <span class="name">Product</span>
    </div>
    """
    
    async for tag in parallel_parse(
        html, 
        tag='span', 
        attrs={'class': 'price'}
    ):
        print(tag.text)

Поиск вложенных тегов

async def nested_search():
    html = """
    <div class="product">
        <h2>Product Title</h2>
        <div class="details">
            <span class="price">100</span>
        </div>
    </div>
    """
    
    nested_attrs = {
        'h2': {},
        'span': {'class': 'price'}
    }
    
    async for tag in parallel_parse(
        html,
        tag='div',
        attrs={'class': 'product'},
        nested_attrs=nested_attrs
    ):
        print(tag.text)

4. Оптимизация производительности

Настройка количества процессов

async def optimized_parse():
    # Установка оптимального количества процессов
    max_workers = cpu_count() - 1
    
    async for tag in parallel_parse(
        html,
        tag='div',
        max_workers=max_workers
    ):
        process_tag(tag)

Управление таймаутом

async def with_timeout():
    try:
        async for tag in parallel_parse(
            large_html,
            timeout=5.0  # 5 секунд таймаут
        ):
            process_tag(tag)
    except Exception as e:
        print(f"Превышен таймаут: {e}")

5. Обработка ошибок

Обработка исключений

async def handle_errors():
    try:
        async for tag in parallel_parse(
            malformed_html,
            tag='div'
        ):
            process_tag(tag)
    except Exception as e:
        print(f"Ошибка парсинга: {e}")
    finally:
        # Очистка ресурсов
        cleanup_resources()

Комментарии (9)


  1. digtatordigtatorov
    29.10.2024 14:23

    Докатились, статьи на Хабре где тупо выкидывают ответ ллм модели…


    1. avalonsec Автор
      29.10.2024 14:23

      Ок. Понял.


    1. avalonsec Автор
      29.10.2024 14:23

      Этот код можно было взять за основу и доработать. Как минимум взять её него идеи.


    1. avalonsec Автор
      29.10.2024 14:23

      Переписал. Полностью рабочий код, только gc что то тупил пришлось пока убрать. Обычно в комбинации с декораторами работает нормально.


      1. danilovmy
        29.10.2024 14:23

        Не стоит использовать function composition. Они не обрабатываются стандартным pickle и потому не могут быть переданы на выполнение в соседний поток или процесс. Больше тут: The Ghosts of Distant Objects | PyCon Lithuania 2024 https://pycon.lt/2024/talks/YUXXZS


        1. avalonsec Автор
          29.10.2024 14:23

          Спасибо, на досуге почитаю.


        1. zoldaten
          29.10.2024 14:23

          danilovmy вы про который function composition ? и что за странная ссылка pycon, что там читать ?


          1. danilovmy
            29.10.2024 14:23

            @zoldaten, спасибо за вопрос. На всякий случай уточню, что мой предыдущий комментарий относится к этому моменту статьи:

            • Не используй классы. Применяй Inline-функции def.

            Я встречал, как вложенные функции в одну функцию называют functions composition (fc). (без пруфа)

            Читать про вложенные функции можно, например, тут https://realpython.com/inner-functions-what-are-they-good-for/#creating-python-inner-functions. правда там обертку для вложенных функций никак не называют.

            Описание проблемы "запикливания" функций, содержащих вложенные функции, можно найти тут: https://stackoverflow.com/questions/72766345/attributeerror-cant-pickle-local-object-in-multiprocessing, первый ответ про "pickle can only serialize top-module level functions in general".

            По моей ссылке на PyCon надо не читать, а смотреть про то, как чел рассказывает и показывает что врожденный pickle не работает при передаче fc между процессами, и какой костыль они соорудили в библиотеке dill, чтобы сделать это возможным.

            В стилистике fc написан FastAPI, благодаря чему он не работает в интеграционных тестах с использованием multiprocessing и авторам Fastapi понадобилось соорудить целый доп класс для тестов. дискуссия по этому поводу тут: https://github.com/fastapi/fastapi/discussions/10213


          1. avalonsec Автор
            29.10.2024 14:23

            Как я понял он про передачу данных между процессами, если передавать данные локально, то да лучше сериализавать в pickle.