В предыдущей статье мы распарсили реплей одного матча по Dota 2 и нашли хайлайты с помощью кластеризации. В данной статье увеличим масштаб и напишем сервис для параллельного парсинга реплеев на Celery и Flask.
Под катом
Собираем ссылки на реплеи матчей The International 2021.
Распараллеливаем парсинг матчей с помощью Celery.
Пишем минимальное API на Flask.
Запускаем парсинг.
Делаем выводы.
Все ссылки на код и использованные материалы вы найдете в конце статьи.
Собираем ссылки на реплеи матчей The International 2021
Качать репелеи через клиент игры — замечательная опция, но довольно нудная для десятков и сотен матчей. Поэтому мы пойдем иным путем и достанем ссылки на реплеи через OpenDota API.
Для начала нам потребуется узнать Tournament ID
для TI 2021. Например, для этого можно зайти на Dotabuff, найти в поиске страницу турнира и пристально посмотреть на URL.
Сделаем запрос к API и получим Match ID
идентификаторы всех матчей с турнира.
import requests
tournament_id = 13256
r = requests.get(
f'https://api.opendota.com/api/leagues/{tournament_id}/matches')
matches = r.json()
len(matches)
> 487
Теперь для каждого матча нужно сходить в Replay API. Здесь стоит учесть, что бесплатная версия API позволяет делать до 60 запросов в минуту.
import time
limit = 14
replays = []
for i, m in enumerate(matches):
if i == limit:
break
match_id = m['match_id']
r = requests.get(
'https://api.opendota.com/api/replays/',
params=dict(match_id=match_id)
)
r.raise_for_status()
replays.append(r.json())
time.sleep(0.05)
replays
> [
[{'match_id': 6078908851, 'cluster': 236, 'replay_salt': 908561694}],
[{'match_id': 6056286110, 'cluster': 187, 'replay_salt': 1627987562}],
...
Ссылки на реплеи получаются подставлением значений в шаблон
http://replay{cluster}.valve.net/570/{match_id}_{replay_salt}.dem.bz2
.
Нюанс в том, что не все ссылки окажутся рабочими, т.к. в некоторых матчах игроки рестартуют лобби. Отфильтруем их, сделав HEAD запросы.
urls = []
for replay in replays:
cluster = replay[0]['cluster']
match_id = replay[0]['match_id']
replay_salt = replay[0]['replay_salt']
url = f'http://replay{cluster}.valve.net/570/{match_id}_{replay_salt}.dem.bz2'
r = requests.head(url)
print(f'Status: {r.status_code}, URL: {url}')
if r.status_code == 200:
urls.append(url)
len(urls)
> 10
Из первых 14 ссылок 10 оказались рабочими. Вполне достаточно для теста, т.к. в среднем один .dem
реплей весит 100 MB.
Распараллеливаем парсинг реплеев с помощью Celery
Здесь я не буду подробно останавливаться на Celery
, на эту тему уже написано множество статей. Просто напомню, что он позволяет выполнять очереди из задач. В качестве брокера я использовал локальный Redis
. Воркеры также работали локально, а всю работу по созданию отдельных процессов под задачи берет на себя Celery
.
Структура проекта выглядит следующим образом.
$ tree -P '*.py' -I '__pycache__' src/
├── async_parser
│ ├── celery.py
│ └── tasks.py
├── server.py
├── settings.py
└── sync_parser.py
1 directory, 5 files
Ниже я приведу выжимку кода. Полную версию вы найдете по ссылке на репозиторий в конце статьи.
Подготовительные работы
Для начала поднимем Redis
.
sudo docker run --name dota-redis -p 6379:6379 -d redis
А также запустим на локальном порту 5600 Clarity Parser
. Подробнее о нем вы можете почитать в предыдущей части.
git clone https://github.com/arch1baald/clarity-parser.git parser
sudo docker build -t odota/parser parser/
sudo docker run -d --name clarity-parser -p 5600:5600 odota/parser
Суть
В файле celery.py
определяем объект Celery
.
from celery import Celery
app = Celery(
'async_parser',
broker=REDIS_URL,
backend=REDIS_URL,
include=['async_parser.tasks'],
accept=['json']
)
app.start()
А в файле tasks.py
определим две основные задачи.
download_save(URL)
— Скачать.dem
реплей и сохранить локальноparse(dem_path)
— Прогнать реплей через Clarity Parser, получить лог событий в формате.jsonlines
и сохранить локально
А чтобы парсинг не начинался до того, как завершилась загрузка реплея воспользуемся celery.chain()
.
1. Загрузка реплея
Задачи для Celery помечаются специальным декоратором @app.task()
.
def download(url):
logger.info(f'Downloading: {url}...')
r = requests.get(url)
r.raise_for_status()
compressed_dem = r.content
logger.info(f'Decompressing: {url}...')
dem = decompress(compressed_dem)
return dem
@app.task()
def download_save(url):
right = url.split('/')[-1]
match_salt = right.replace('dem.bz2', '')
file_name = match_salt.split('_')[0]
file_name += '.dem'
path = os.path.join(REPLAY_DIR, file_name)
if os.path.exists(path):
logger.info(f'Dem file already exists: {path}...')
return path
dem = download(url)
with open(path, 'wb') as fout:
fout.write(dem)
logger.info(f'Saved to {path}...')
return path
Чтобы не раздувать Redis, задача возвращает путь к файлу реплея на диске, а не содержимое самого файла. В случае, если вы хотите обработать все матчи с турнира, можно использовать S3 вместо локального диска.
2. Парсинг
@app.task()
def parse(dem_path, remove_dem=False):
jsonlines_path = dem_path.replace('.dem', '.jsonlines')
logger.info(f'Parsing {jsonlines_path}...')
cmd = f'curl localhost:5600 --data-binary "@{dem_path}" > {jsonlines_path}'
subprocess.run(cmd, shell=True)
if os.path.getsize(jsonlines_path) == 0:
os.remove(jsonlines_path)
raise ClarityParserException(
f'Result file is empty: {jsonlines_path}...\nDid you forget to run odota/parser?')
if os.path.exists(dem_path) and remove_dem:
logger.info(f'Removing temporary file {dem_path}...')
os.remove(dem_path)
return jsonlines_path
Финальная функция, объединяющая предыдущие задачи в последовательность. Обратите внимание, что в parse
мы не передаем аргумент dem_path
, потому что chain
сам подставляет результат из download_save.s(url)
.
def download_parse_save(url):
res = chain(download_save.s(url), parse.s())()
return res
Запустим Celery.
celery -A async_parser worker -l INFO
Пишем минимальное API на Flask
В данный момент наша очередь задач на парсинг пуста. Чтобы это исправить, напишем простое API, которое принимает в качестве параметра ссылку на реплей и добавляет задачу в очередь.
В файле server.py
напишем логику для ручки localhost:5000/parse
.
from flask import Flask, request, jsonify
from async_parser.tasks import download_parse_save
app = Flask(__name__)
@app.route('/parse')
def parse():
dem_url = request.args.get('url')
logger.info(f'{dem_url=}')
if dem_url is None:
return jsonify(dict(
success=False,
error='Demo URL not found'
)), 400
async_result = download_parse_save(dem_url)
logger.info(f'{async_result}')
return jsonify(dict(
success=True,
url=dem_url,
job_id=async_result.id
))
Запустим веб-сервер.
FLASK_APP=server flask run
Запускаем парсинг
Для этого сделаем запрос к серверу.
curl -X GET 'http://localhost:5000/parse?url=http://replay191.valve.net/570/6216665747_89886887.dem.bz2'
Запрос обрабатывает Flask API и добавляет цепочку из двух задач в очередь с помощью
download_parse_save(dem_url)
Celery worker видит задачу
download_save(url)
, идет по ссылкеhttp://replay191.valve.net/570/6216665747_89886887.dem.bz2
на CDN Valve, скачивает реплей и сохраняет на дискеCelery worker видит задачу
parse(dem_path)
и делает запрос к Clarity ParserРезультат работы Clarity Parser сохраняется в формате
.jsonlines
лога на диске
Отлично! Теперь вспомним, что изначально нашей задачей было распарсить 10 матчей с The International 2021, ссылки на которые мы получили ранее. Для этого запустим простой скрипт.
with open(os.path.join(REPLAY_DIR, 'urls.txt'), 'r') as fin:
for url in fin:
url = url.strip()
try:
r = requests.get('http://localhost:5000/parse', params=dict(url=url))
r.raise_for_status()
except Exception as e:
logger.info(e)
continue
logger.info(r.json())
time.sleep(0.05)
Отойдем на пару минут, нальем чаю, а по возвращению обнаружим файлы с результатами.
ls replays
6066863360.jsonlines 6215020578.jsonlines 6216545156.jsonlines 6227492909.jsonlines
6079386505.jsonlines 6215346651.jsonlines 6216665747.jsonlines urls.txt
6214179880.jsonlines 6216526891.jsonlines 6227203516.jsonlines
Пример содержимого одного из файлов.
Делаем выводы
Вы дочитали статью — вы великолепны. Теперь вы можете парсить сотни матчей по Dota 2 за разумное время.
В следующей части мы вернемся к теме поиска хайлайтов, улучшим алгоритм и обкатаем его на нескольких матчах.