Предлагаю протестировать скрипт написанный на основе заготовки cloude 3.5 Sonnet с использованием специального промта. Пришлось почти полностью переписать, из-за товарищей в комментариях. Критика это хорошо когда обоснована.
Документацию доделаю потом. Внизу есть пример скрипта.
Системный промт через который нейронка дала заготовку для "Хака".
Скрытый текст
Ты - высококвалифицированный Python-программист и инженер данных в области машинного обучения (ML). Твоя задача - помогать пользователям в написании и отладки кода, анализе данных, оптимизации кода. Ты управляешь агентами экспертной системы, которые помогают тебе в решении различных задач.
Алгоритм логики ответа (Рабочий процесс - Цепочка размышлений)
Уточнение: Задавай вопросы, уточняющие детали контекста. Попроси четко сформулировать задачу по технике SMART (Specific, Measurable, Achievable, Relevant, Time-bound).
Определи тематическую область, выбери агента экспертной системы для решения задачи.
Предварительная обработка запроса: Выдели ключевые элементы. Раздели один большой запрос на несколько подзапросов, задавай сам себе гипотетические вопросы о подзапросах и отвечай на них.
Обработка данных: Проверяй свои гипотезы, аргументируй свою точку зрения, используй комбинацию аналитического и дедуктивного мышления. Определи уровни достоверности для каждого шага рассуждений. Отвечай какие препятствия и трудности могут возникнуть.
Подготовка промежуточного ответа: Критически оцени свой ответ. На основе критики исправь ошибки, дополни код, обеспечь полноту и согласованность ответа. Только после этого переходи к выводу окончательного результата!
Вывод окончательного результата: Необходимо предоставить детализированный, структурированный ответ с примерами, шаг за шагом. Если вы не знаешь ответа на задачу или вопрос, просто скажи что не знаешь, а не выдумывай.
Обратная связь: Запрашивай обратную связь у пользователя после выполнения задачи и предлагай улучшения на основе его комментариев.
Приоритет на высокую скорость обработки данных скрипта и оптимизацию потребления памяти для обработки больших объёмов данных. Используй только те библиотеки которые позволяют добиться высокой скорости обработки данных скрипта при широкой функциональности не приводящие к неоправданно высокому потреблению памяти.
Техники оптимизации кода:
Замена обычных циклов for in где это возможно на оптимизированные генераторы List/generator Comprehension. Пример: collections.deque([item for item in strings]).
Для хранения неизменных списков используй numpy.array или tuple.
Используй map, filter, functools.reduce для оптимизации.
Используй модуль typing для типизации.
Используй декоратор functools.lru_cache для эффективной работы с памятью.
Используй комбинацию корутин asyncio, процессов-демонов Process(daemon=True) и очередей Queue из модуля multiprocessing для параллельного выполнения задач с защитой от глобальной блокировки интерпретатора.
Используй профилирование cProfile для выявления узких мест.
Ограничения:
Не используй классы. Применяй Inline-функции def.
Обеспечь совместимость кода python 3.10-3.12 включительно.
Не используй логирование.
Scrapy конечно хорошая библиотека, но у него много лишнего функционала, нету модульности и иногда очень странно парсит данные с сайта вырывая кусками.
Установка зависимостей:
pip install beautifulsoup4 lxml numpy psutil
Файл fast_soup.py
:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
import asyncio
from functools import lru_cache
import psutil
from typing import AsyncIterator, Optional, Dict, Any
from collections import deque
import numpy as np
from bs4 import BeautifulSoup, Tag
import platform
import warnings
import _pickle as cPickle
def _get_optimal_workers() -> int:
"""Возвращает оптимальное количество процессов"""
return min(cpu_count(), 61) if platform.system() == 'Windows' else cpu_count()
def _get_optimal_cache_size() -> int:
"""Определяет оптимальный размер кэша based on system memory"""
available_memory = psutil.virtual_memory().available
workers = _get_optimal_workers()
base_cache_size = workers * 32
if available_memory < 2 * 1024 * 1024 * 1024: # < 2GB
return min(base_cache_size, 64)
elif available_memory < 4 * 1024 * 1024 * 1024: # < 4GB
return min(base_cache_size, 128)
return min(base_cache_size, 256)
@lru_cache(maxsize=_get_optimal_cache_size())
def _cached_parse(html: str, parser: str) -> BeautifulSoup:
"""Кэширует создание объекта BeautifulSoup"""
return BeautifulSoup(html, parser)
def _should_clear_cache() -> bool:
"""Проверяет необходимость очистки кэша"""
cache_info = _cached_parse.cache_info()
return (cache_info.currsize / cache_info.maxsize) > 0.8
def _parse_nested_tags(tag: Tag, nested_attrs: Dict[str, Any]) -> np.ndarray:
"""Рекурсивно ищет вложенные теги с заданными атрибутами"""
results = deque(tag.find_all(nested_tag, attrs if attrs else {})
for nested_tag, attrs in nested_attrs.items())
return np.array(results, dtype=object)
def _parse_chunk(
html: str,
tag: Optional[str] = None,
attrs: Optional[dict] = None,
nested_attrs: Optional[Dict[str, Dict]] = None,
parser: str = 'lxml'
) -> np.ndarray:
"""Парсит HTML в отдельном процессе с поддержкой вложенных тегов"""
try:
soup = _cached_parse(html, parser)
results = deque()
if tag:
initial_tags = soup.find_all(tag, attrs if attrs else {})
results.extend(initial_tags)
if nested_attrs and initial_tags:
nested_results = [_parse_nested_tags(initial_tag, nested_attrs)
for initial_tag in initial_tags]
results.extend([item for sublist in nested_results for item in sublist])
else:
results.append(soup)
if _should_clear_cache():
_cached_parse.cache_clear()
return cPickle.dumps(np.array([r for r in results if r is not None], dtype=object))
except Exception as e:
warnings.warn(f"Ошибка парсинга: {e}")
return cPickle.dumps(np.array([]))
async def parallel_parse(
html: str,
tag: Optional[str] = None,
attrs: Optional[dict] = None,
nested_attrs: Optional[Dict[str, Dict]] = None,
max_workers: Optional[int] = None,
timeout: float = 30.0,
parser: str = 'lxml'
) -> AsyncIterator[Tag]:
"""
Асинхронный парсер с поддержкой вложенных тегов
"""
try:
max_workers = max_workers or _get_optimal_workers()
loop = asyncio.get_running_loop()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
task = loop.run_in_executor(
executor,
_parse_chunk,
html,
tag,
attrs,
nested_attrs,
parser
)
try:
serialized_result = await asyncio.wait_for(task, timeout)
result = cPickle.loads(serialized_result)
for item in result:
if isinstance(item, Tag):
yield item
except asyncio.TimeoutError:
warnings.warn("Превышен таймаут задачи")
_cached_parse.cache_clear()
except Exception as e:
warnings.warn(f"Ошибка выполнения задачи: {e}")
except Exception as e:
warnings.warn(f"Общая ошибка парсинга: {e}")
finally:
_cached_parse.cache_clear()
Документация по асинхронному HTML парсеру
Содержание
Общее описание
Основные функции
Примеры использования
Оптимизация производительности
Обработка ошибок
1. Общее описание
Данный модуль представляет собой высокопроизводительный асинхронный HTML парсер с поддержкой многопроцессорной обработки. Основные преимущества:
Асинхронное выполнение
Многопроцессорная обработка
Кэширование результатов
Оптимизированное использование памяти
Поддержка вложенных тегов
2. Основные функции
parallel_parse
Основная функция для асинхронного парсинга HTML:
async def parallel_parse(
html: str, # HTML строка для парсинга
tag: Optional[str] = None, # Тег для поиска
attrs: Optional[dict] = None, # Атрибуты тега
nested_attrs: Optional[Dict[str, Dict]] = None, # Вложенные теги
max_workers: Optional[int] = None, # Количество процессов
timeout: float = 30.0, # Таймаут выполнения
parser: str = 'lxml' # Тип парсера
) -> AsyncIterator[Tag]
3. Примеры использования
Базовый пример
import asyncio
async def main():
html = """
<div class="content">
<p>Текст 1</p>
<p>Текст 2</p>
</div>
"""
async for tag in parallel_parse(html, tag='p'):
print(tag.text)
asyncio.run(main())
Поиск с атрибутами
async def find_with_attrs():
html = """
<div>
<span class="price">100</span>
<span class="price">200</span>
<span class="name">Product</span>
</div>
"""
async for tag in parallel_parse(
html,
tag='span',
attrs={'class': 'price'}
):
print(tag.text)
Поиск вложенных тегов
async def nested_search():
html = """
<div class="product">
<h2>Product Title</h2>
<div class="details">
<span class="price">100</span>
</div>
</div>
"""
nested_attrs = {
'h2': {},
'span': {'class': 'price'}
}
async for tag in parallel_parse(
html,
tag='div',
attrs={'class': 'product'},
nested_attrs=nested_attrs
):
print(tag.text)
4. Оптимизация производительности
Настройка количества процессов
async def optimized_parse():
# Установка оптимального количества процессов
max_workers = cpu_count() - 1
async for tag in parallel_parse(
html,
tag='div',
max_workers=max_workers
):
process_tag(tag)
Управление таймаутом
async def with_timeout():
try:
async for tag in parallel_parse(
large_html,
timeout=5.0 # 5 секунд таймаут
):
process_tag(tag)
except Exception as e:
print(f"Превышен таймаут: {e}")
5. Обработка ошибок
Обработка исключений
async def handle_errors():
try:
async for tag in parallel_parse(
malformed_html,
tag='div'
):
process_tag(tag)
except Exception as e:
print(f"Ошибка парсинга: {e}")
finally:
# Очистка ресурсов
cleanup_resources()
digtatordigtatorov
Докатились, статьи на Хабре где тупо выкидывают ответ ллм модели…
avalonsec Автор
Ок. Понял.
avalonsec Автор
Этот код можно было взять за основу и доработать. Как минимум взять её него идеи.
avalonsec Автор
Переписал. Полностью рабочий код, только gc что то тупил пришлось пока убрать. Обычно в комбинации с декораторами работает нормально.
danilovmy
Не стоит использовать function composition. Они не обрабатываются стандартным pickle и потому не могут быть переданы на выполнение в соседний поток или процесс. Больше тут: The Ghosts of Distant Objects | PyCon Lithuania 2024 https://pycon.lt/2024/talks/YUXXZS
avalonsec Автор
Спасибо, на досуге почитаю.
zoldaten
danilovmy вы про который function composition ? и что за странная ссылка pycon, что там читать ?
danilovmy
@zoldaten, спасибо за вопрос. На всякий случай уточню, что мой предыдущий комментарий относится к этому моменту статьи:
Я встречал, как вложенные функции в одну функцию называют functions composition (fc). (без пруфа)
Читать про вложенные функции можно, например, тут https://realpython.com/inner-functions-what-are-they-good-for/#creating-python-inner-functions. правда там обертку для вложенных функций никак не называют.
Описание проблемы "запикливания" функций, содержащих вложенные функции, можно найти тут: https://stackoverflow.com/questions/72766345/attributeerror-cant-pickle-local-object-in-multiprocessing, первый ответ про "pickle can only serialize top-module level functions in general".
По моей ссылке на PyCon надо не читать, а смотреть про то, как чел рассказывает и показывает что врожденный pickle не работает при передаче fc между процессами, и какой костыль они соорудили в библиотеке dill, чтобы сделать это возможным.
В стилистике fc написан FastAPI, благодаря чему он не работает в интеграционных тестах с использованием multiprocessing и авторам Fastapi понадобилось соорудить целый доп класс для тестов. дискуссия по этому поводу тут: https://github.com/fastapi/fastapi/discussions/10213
avalonsec Автор
Как я понял он про передачу данных между процессами, если передавать данные локально, то да лучше сериализавать в pickle.