В предыдущей статье мы распарсили реплей одного матча по Dota 2 и нашли хайлайты с помощью кластеризации. В данной статье увеличим масштаб и напишем сервис для параллельного парсинга реплеев на Celery и Flask.

Под катом

  1. Собираем ссылки на реплеи матчей The International 2021.

  2. Распараллеливаем парсинг матчей с помощью Celery.

  3. Пишем минимальное API на Flask.

  4. Запускаем парсинг.

  5. Делаем выводы.

  6. Все ссылки на код и использованные материалы вы найдете в конце статьи.

Собираем ссылки на реплеи матчей The International 2021

Качать репелеи через клиент игры — замечательная опция, но довольно нудная для десятков и сотен матчей. Поэтому мы пойдем иным путем и достанем ссылки на реплеи через OpenDota API.

Для начала нам потребуется узнать Tournament ID для TI 2021. Например, для этого можно зайти на Dotabuff, найти в поиске страницу турнира и пристально посмотреть на URL.

Ссылка на TI2021 на Dotabuff
Ссылка на TI2021 на Dotabuff

Сделаем запрос к API и получим Match ID идентификаторы всех матчей с турнира.

import requests

tournament_id = 13256
r = requests.get(
  f'https://api.opendota.com/api/leagues/{tournament_id}/matches')
matches = r.json()
len(matches)

> 487

Теперь для каждого матча нужно сходить в Replay API. Здесь стоит учесть, что бесплатная версия API позволяет делать до 60 запросов в минуту.

import time

limit = 14
replays = []
for i, m in enumerate(matches):
    if i == limit:
        break
    
    match_id = m['match_id']
    r = requests.get(
      'https://api.opendota.com/api/replays/', 
      params=dict(match_id=match_id)
    )
    r.raise_for_status()
    replays.append(r.json())
    time.sleep(0.05)
    
replays
> [
   [{'match_id': 6078908851, 'cluster': 236, 'replay_salt': 908561694}],
   [{'match_id': 6056286110, 'cluster': 187, 'replay_salt': 1627987562}],
  ...

Ссылки на реплеи получаются подставлением значений в шаблон

http://replay{cluster}.valve.net/570/{match_id}_{replay_salt}.dem.bz2.

Нюанс в том, что не все ссылки окажутся рабочими, т.к. в некоторых матчах игроки рестартуют лобби. Отфильтруем их, сделав HEAD запросы.

urls = []
for replay in replays:
    cluster = replay[0]['cluster']
    match_id = replay[0]['match_id']
    replay_salt = replay[0]['replay_salt']
    url = f'http://replay{cluster}.valve.net/570/{match_id}_{replay_salt}.dem.bz2'

    r = requests.head(url)
    print(f'Status: {r.status_code}, URL: {url}')
    if r.status_code == 200:
        urls.append(url)
len(urls)
> 10

Из первых 14 ссылок 10 оказались рабочими. Вполне достаточно для теста, т.к. в среднем один .dem реплей весит 100 MB.

Распараллеливаем парсинг реплеев с помощью Celery

Здесь я не буду подробно останавливаться на Celery, на эту тему уже написано множество статей. Просто напомню, что он позволяет выполнять очереди из задач. В качестве брокера я использовал локальный Redis. Воркеры также работали локально, а всю работу по созданию отдельных процессов под задачи берет на себя Celery.

Структура проекта выглядит следующим образом.

$ tree -P '*.py' -I '__pycache__' src/

├── async_parser
│   ├── celery.py
│   └── tasks.py
├── server.py
├── settings.py
└── sync_parser.py

1 directory, 5 files

Ниже я приведу выжимку кода. Полную версию вы найдете по ссылке на репозиторий в конце статьи.

Подготовительные работы

Для начала поднимем Redis.

sudo docker run --name dota-redis -p 6379:6379 -d redis

А также запустим на локальном порту 5600 Clarity Parser. Подробнее о нем вы можете почитать в предыдущей части.

git clone https://github.com/arch1baald/clarity-parser.git parser
sudo docker build -t odota/parser parser/
sudo docker run -d --name clarity-parser -p 5600:5600 odota/parser

Суть

В файле celery.py определяем объект Celery.

from celery import Celery

app = Celery(
    'async_parser',
    broker=REDIS_URL,
    backend=REDIS_URL,
    include=['async_parser.tasks'],
    accept=['json']
)

app.start()

А в файле tasks.py определим две основные задачи.

  1. download_save(URL) — Скачать .dem реплей и сохранить локально

  2. parse(dem_path) — Прогнать реплей через Clarity Parser, получить лог событий в формате .jsonlines и сохранить локально

А чтобы парсинг не начинался до того, как завершилась загрузка реплея воспользуемся celery.chain().

1. Загрузка реплея

Задачи для Celery помечаются специальным декоратором @app.task().

def download(url):
    logger.info(f'Downloading: {url}...')
    r = requests.get(url)
    r.raise_for_status()
    
    compressed_dem = r.content
    logger.info(f'Decompressing: {url}...')
    dem = decompress(compressed_dem)
    return dem


@app.task()
def download_save(url):
    right = url.split('/')[-1]
    match_salt = right.replace('dem.bz2', '')
    file_name = match_salt.split('_')[0]
    file_name += '.dem'
    path = os.path.join(REPLAY_DIR, file_name)
    
    if os.path.exists(path):
        logger.info(f'Dem file already exists: {path}...')
        return path

    dem = download(url)
    with open(path, 'wb') as fout:
        fout.write(dem)
    logger.info(f'Saved to {path}...')
    return path

Чтобы не раздувать Redis, задача возвращает путь к файлу реплея на диске, а не содержимое самого файла. В случае, если вы хотите обработать все матчи с турнира, можно использовать S3 вместо локального диска.

2. Парсинг

@app.task()
def parse(dem_path, remove_dem=False):
    jsonlines_path = dem_path.replace('.dem', '.jsonlines')

    logger.info(f'Parsing {jsonlines_path}...')
    cmd = f'curl localhost:5600 --data-binary "@{dem_path}" > {jsonlines_path}'
    subprocess.run(cmd, shell=True)

    if os.path.getsize(jsonlines_path) == 0:
        os.remove(jsonlines_path)
        raise ClarityParserException(
            f'Result file is empty: {jsonlines_path}...\nDid you forget to run odota/parser?')
    
    if os.path.exists(dem_path) and remove_dem:
        logger.info(f'Removing temporary file {dem_path}...')
        os.remove(dem_path)
    return jsonlines_path

Финальная функция, объединяющая предыдущие задачи в последовательность. Обратите внимание, что в parse мы не передаем аргумент dem_path, потому что chain сам подставляет результат из download_save.s(url).

def download_parse_save(url):
    res = chain(download_save.s(url), parse.s())()
    return res

Запустим Celery.

celery -A async_parser worker -l INFO

Пишем минимальное API на Flask

В данный момент наша очередь задач на парсинг пуста. Чтобы это исправить, напишем простое API, которое принимает в качестве параметра ссылку на реплей и добавляет задачу в очередь.

В файле server.py напишем логику для ручки localhost:5000/parse.

from flask import Flask, request, jsonify

from async_parser.tasks import download_parse_save


app = Flask(__name__)


@app.route('/parse')
def parse():
    dem_url = request.args.get('url')
    logger.info(f'{dem_url=}')
    if dem_url is None:
        return jsonify(dict(
            success=False, 
            error='Demo URL not found'
        )), 400

    async_result = download_parse_save(dem_url)
    logger.info(f'{async_result}')
    return jsonify(dict(
        success=True,
        url=dem_url,
        job_id=async_result.id
    ))

Запустим веб-сервер.

FLASK_APP=server flask run

Запускаем парсинг

Для этого сделаем запрос к серверу.

curl -X GET 'http://localhost:5000/parse?url=http://replay191.valve.net/570/6216665747_89886887.dem.bz2'
  • Запрос обрабатывает Flask API и добавляет цепочку из двух задач в очередь с помощью download_parse_save(dem_url)

  • Celery worker видит задачу download_save(url), идет по ссылке http://replay191.valve.net/570/6216665747_89886887.dem.bz2 на CDN Valve, скачивает реплей и сохраняет на диске

  • Celery worker видит задачу parse(dem_path) и делает запрос к Clarity Parser

  • Результат работы Clarity Parser сохраняется в формате .jsonlines лога на диске

Отлично! Теперь вспомним, что изначально нашей задачей было распарсить 10 матчей с The International 2021, ссылки на которые мы получили ранее. Для этого запустим простой скрипт.

with open(os.path.join(REPLAY_DIR, 'urls.txt'), 'r') as fin:
  for url in fin:
    url = url.strip()

    try:
      r = requests.get('http://localhost:5000/parse', params=dict(url=url))
      r.raise_for_status()
      except Exception as e:
        logger.info(e)
        continue

        logger.info(r.json())
        time.sleep(0.05)

Отойдем на пару минут, нальем чаю, а по возвращению обнаружим файлы с результатами.

ls replays

6066863360.jsonlines	6215020578.jsonlines	6216545156.jsonlines	6227492909.jsonlines
6079386505.jsonlines	6215346651.jsonlines	6216665747.jsonlines	urls.txt
6214179880.jsonlines	6216526891.jsonlines	6227203516.jsonlines

Пример содержимого одного из файлов.

Делаем выводы

Вы дочитали статью — вы великолепны. Теперь вы можете парсить сотни матчей по Dota 2 за разумное время.

В следующей части мы вернемся к теме поиска хайлайтов, улучшим алгоритм и обкатаем его на нескольких матчах.

Ссылки

Комментарии (0)