Возникла задача проверки нескольких типов пользовательских документов Excel. Проверка должна покрывать такие аспекты, как корректность шаблона (наличие ожидаемых страниц, колонок таблиц) и корректность данных (присутствие обязательных значений, соответствие форматам, отсутствие дубликации, итд).

Пользователю нужно возвращать информацию "что не так с файлом": какую проверку не прошел файл и где конкретно в файле проблемные данные.

Эта задача очень напоминает тестирование - тестирование качества данных. Так почему бы не использовать фреймворк тестирования pytest, и не написать тесты на каждый проверяемый аспект и для каждого типа файлов? Однако, есть небольшое "но": проверка должна быть реализована в качестве сервиса, чтобы встраиваться в более широкий процесс обработки пользовательских документов.

Давайте посмотрим, как заставить pytest работать внутри сервиса. Это не так тривиально, как может показаться на первый взгляд.

Начнем с простого - немного "потестируем файл"

Для начала просто убедимся, что использование pytest для проверки файла вообще имеет смысл. Сделаем файл теста и поглядим, как все работает.

Для примера представим файл в котором одна страница "Config". На ней простая таблица из трех колонок: "Code", "TextData", "Numbers" (этот файл лежит здесь).

Проверим, что:

  1. Страница с таким именем есть

  2. В таблице на этой странице, если ее открыть с помощью pandas есть все колонки которые мы ожидаем.

  3. В колонках "Code", "TextData" каждая строка содержит данные. Это будет пример "построчной проверки". Конечно это можно сделать и эффективнее - просмотрев сет целиком. Но будем помнить, что в реальной задаче проверки могут быть сложнее - затрагиваю отношения между столбцами, проверку ссылок на другие столбцы, итд.

Примерно код теста будет выглядеть как на вставке. Для проверки пока укажем прямую ссылку на файл.

import pytest
import pandas


SHEET_NAME = 'Config'
TABLE_OFFSET = 0
KEY_FIELD = 'Code'


def get_config_data():
    data = pandas.read_excel("<static path to file>", 
                             sheet_name=SHEET_NAME, 
                             skiprows=TABLE_OFFSET)
    return data

# Generate array of rows - to parametrize tests
try:
    config_data_records = [rec for rec in get_config_data().iterrows() if str(rec[1][KEY_FIELD]) != 'nan']
    config_indexes = [rec[0] for rec in config_data_records]
except:
    config_data_records = None
    config_indexes = None
  
@pytest.fixture()
def config_dataset():
    return get_config_data()

@pytest.fixture(params=config_data_records, ids=config_indexes)
def config_record(request):
    return request.param[1], request.param[0] + TABLE_OFFSET + 2  # Excel rows start with 1, plus caption, plus offset

  
@pytest.mark.tryfirst
def test_data_exists():
    get_config_data()

# Test applies for whole rowset
@pytest.mark.parametrize("column_name", [KEY_FIELD, 'TextData', 'Numbers'])
def test_data_has_column(config_dataset, column_name):
    assert column_name in config_dataset.columns

# Test applies for each row of the file
@pytest.mark.parametrize("column_name", [KEY_FIELD, 'TextData'])
def test_mandatory_value_is_missing(config_record, column_name):
    rec, line_no = config_record
    assert not any(rec[[column_name]].isna())

Тесты работают и ловят ошибки, где необходимо. Пока все хорошо.

Программный вызов pytest

Теперь запустим тесты программно. В документации у pytest это описано - можно делать.

files = ['<static testfile.py>']

pytest.main(files)

Сбор результатов тестирования

Чтобы собрать результаты запуска тестов и обработать придется сделать так, как рассказано здесь на stackoverflow. Придумывать нечего нового не будем - просто используем подсказку.

# Definition of ResultsCollector goes here

collector = ResultsCollector()
files = ['<static testfile.py>']

pytest.main(files, plugins=[collector])

# Process collected results

Передача имени проверяемого файла

Вроде все в порядке. Однако, пора начинать добавлять "динамики" - передать имя проверяемого файла внутрь теста.

Это можно сделать написав еще один небольшой плагин, который будет определять новый параметр командной строки с именем проверяемого файла и потом сохранять его в глобальную переменную - в пакет pytest

class FileRef:
    def __init__(self):
        self.file_name = None

    @pytest.hookimpl
    def pytest_addoption(self, parser):
        parser.addoption("--file", action="store")

    @pytest.hookimpl
    def pytest_configure(self, config):
        self.file_name = config.option.file
        pytest.data_file_name = config.option.file

Тогда запуск тестов будет выглядеть так:

file_to_test = "<path to excel file>"

collector = ResultsCollector()
file_ref = FileRef()
plugins = [collector, file_ref]

files = ['<static testfile.py>', '--file', file_to_test]

pytest.main(files, plugins=plugins)

# Process collected results

а в самом тесте останется заменить фиксированный путь на переменную:

...

def get_config_data():
    data = pandas.read_excel(pytest.data_file_name, # Now this will come from the parameter
                             sheet_name=SHEET_NAME, 
                             skiprows=TABLE_OFFSET)
    return data

...

Про использование параметра в тестах

Кстати, есть альтернативный подход в использовании переданных параметров, где конфигурация используется в фикстуре, как например описано здесь. Однако, для наших целей этот способ не подойдет.

По интернету ходит "дерево событий" pytest, которое перепечатывается из статьи в статью (одно из первых упоминаний, ссылки, перепечатки) . Это "дерево" хорошо иметь под рукой при разработке собственных плагинов.

Фикстуры "появляются" только на фазе "collection" (2) - на той же фазе происходит сбор тестов и их параметризация (в результате чего появляются "экземпляры тестов". Таким образом, заранее открыть файл чтобы считать его строки и подготовить массив для построчной параметризации не получится.

Доступ к файлу нужен уже на стадии "configure" (1).

Все - в сервис

Теперь можно начать упаковывать в сервис полученные тесты и компоненты для их запуска. Чтобы подготовить результаты тестирования (а точнее - "упавших" проверок), удобно объявить общие типы.

# common_types.py

from enum import Enum
from pydantic import BaseModel
from typing import List, Optional


class ConfigType(str, Enum):
    file_type_1 = 'file-type-1'
    file_type_2 = 'file-type-2'


class TestResult(BaseModel):
    id: str
    outcome: str
    rule: str
    line_no: Optional[int] = None
    field_name: Optional[str] = None
    ref: Optional[str] = None


class FileValidationResult(BaseModel):
    file_name: str
    result: int
    passed_cnt: int
    failed_cnt: int
    xfailed_cnt: int
    skipped_cnt: int
    total_duration_sec: float
    failed_tests: List[TestResult]
    version: str = None

Эти типы пригодятся в качестве финальной формы программной обработки результатов тестирования. Их можно также использовать для объявления параметров и результата вызова сервиса.

Сервис будет принимать файл и его ожидаемый тип, сохранять его во временный файл и передавать имя этого файла вместе с типом в процесс тестирования.

# main.py

...

@app.post('/validate_file', response_model=FileValidationResult)
async def validate_file(config_type: ConfigType,
                        file: UploadFile = File()
                        ):

    _, file_extension = os.path.splitext(file.filename)

    with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as _file:
        _content = await file.read()
        _file.write(_content)
        _tmp_file_name = _file.name

    res = validator.run_test(_tmp_file_name, config_type)
    res.file_name = file.filename
    res.version = __version__

    os.remove(_tmp_file_name)

    return res

На основании ожидаемого типа можно подбирать тесты для исполнения. Конечно более корректно складывать все тесты для определенного типа в общую папку и передавать в pytest ее целиком. А можно полагаться на связку имени файла и типа.

# validator.py

...

def run_test(file_name: str, file_type: ConfigType):
    
    ...
  
    _folder = file_type.value
    if os.path.isdir(_folder):
        # If we have a folder with the name matching file type - we take it
        files = [_folder]
    else:
        # If no folder defined - search files with names related to file type
        _ft = file_type.value.replace('-', '_')
        files = glob.glob(f'test_{_ft}*.py')

    # Add parameter for name of the file being tested
    files += ['--file', file_name]

    # and run found tests against the given file. Plugins will help.
    pytest.main(files, plugins=plugins)

    # Process results of tests based on the internal state of `collector` instance
    ...
    

Очевидное замечание про папку с файлами тестов против просто файлов тестов в корневой директории. В случае организации тестов в папки, при добавлении новых типов проверяемых файлов необходимо подправлять и Dockerfile. При добавлении файлов в корень проекта такого делать будет не нужно.

FROM python:3.11-slim

COPY requirements.txt /app/
WORKDIR /app/
RUN pip install --no-cache-dir -r requirements.txt

# Install application
COPY *.py /app/

RUN mkdir {file-type-1, file-type-2}

COPY file-type-1/*.py /app/file-type-1/
COPY file-type-2/*.py /app/file-type-2/

EXPOSE 8000

ENTRYPOINT ["uvicorn", "--host", "0.0.0.0", "--port", "8000", "--workers", "1", "main:app"]

И казалось бы все готово, но ...

Собранный таким образом сервис принимает файл, запускает тестирование, выдает результаты в ожидаемом формате.

Однако тестирование быстро покажет, что если несколько раз последовательно вызывать сервис подсовывать ему разные файлы - он будет упорно показывать результат проверки первого файла.

Причина "простая". Реализация pytest настолько глубоко зависит от динамического создания и кэширования экземпляров тестовых функций и фикстур, что повторный программный запуск в одном контексте исполнения становится просто невозможен. Об этом официальная документация сообщает - но в конце статьи.

Чтобы обойти это ограничение необходимо запускать сессию тестирования в отдельном процессе. Этот процесс должен возвращать результат тестирования для дальнейшей обработки.

Переупаковываем основную часть запуска тестов в компактную изолированную функцию и организовываем ее вызов в отдельном процессе. Для возврата результатов выполнения в виде экземпляра collector используем очередь, объявленную в вызывающем процессе.

# validator.py

...

import multiprocessing as mp

...

def _run(file_name: str, file_type: ConfigType, q):
    collector = ResultsCollector()
    file_ref = FileRef()
    plugins = [collector, file_ref]

    _folder = file_type.value
    if os.path.isdir(_folder):
        files = [_folder]
    else:
        _ft = file_type.value.replace('-', '_')
        files = glob.glob(f'test_{_ft}*.py')

    files += ['--file', file_name]

    pytest.main(files, plugins=plugins)

    q.put(collector)


def run_test(file_name: str, file_type: ConfigType):
    # Need to run tests into totally different process to make it not to "stuck"
    # with the first file

    q = mp.Queue()
    p = mp.Process(target=_run, args=(file_name, file_type, q))
    p.start()
    collector = q.get()
    p.join()

    # Process results of tests based on the internal state of `collector` instance
    ...

Теперь при повторных вызовах все работает как ожидалось - тесты учитывают изменяющийся вход, фикстуры перестраиваются, тесты исполняются.

Надо учитывать, однако, что если вдруг в данные исполнения заберутся несериализуемые данные (у меня такое получилось случайно), то процесс зависнет: данные не получится поместить в очередь, а вызывающий процесс будет ждать "посылки" и никогда не закончится.

Дополнение об асинхронности

В комментариях резонно указали на то, что текущий код может блокировать event loop асинхронного исполнения.

Исправляю:

# validator.py

...

import asyncio
import concurrent.futures

...

def _run(file_name: str,
         file_type: ConfigType
         ):
    collector = ResultsCollector()
    file_ref = FileRef()
    plugins = [collector, file_ref]

    _folder = file_type.value
    if os.path.isdir(_folder):
        files = [_folder]
    else:
        _ft = file_type.value.replace('-', '_')
        files = glob.glob(f'test_{_ft}*.py')

    files += ['--file', file_name]

    pytest.main(files, plugins=plugins)

    # Simply return collector
    return collector


async def run_test(file_name: str, file_type: ConfigType):
    loop = asyncio.get_running_loop()

    with concurrent.futures.ProcessPoolExecutor() as pool:
        collector = await loop.run_in_executor(pool, _run, file_name, file_type)

    ...

Получилось даже более компактно и красиво: сложности с возвратом данных из подпроцесса скрыты, а раньше приходилось отдельную очередь использовать.

Немного красоты pytest

Несмотря на значительную подготовительную работу, теперь процесс работает и можно наслаждаться мощью экосистемы pytest.

Зависимые тесты

Если в проверяемом файл не найдена необходимая страница - зачем проверять что то дальше и тратить время? pytest предоставляет возможность описать зависимость между тестами с помощью расширения pytest-dependency.

Так мы можем пометить ключевой тест, указать что его надо выполнять первым, и потом указать в других тестах что они зависят от ключевого теста. Если ключевой тест упадет - зависимые будут помечены как пропущенные.

# test_file.py

...

@pytest.mark.tryfirst
@pytest.mark.dependency()
def test_data_exists():
    get_config_data()


@pytest.mark.parametrize("column_name", [KEY_FIELD, 'TextData', 'Numbers'])
@pytest.mark.dependency(depends=["test_data_exists"])
def test_data_has_column(config_dataset, column_name):
    assert column_name in config_dataset.columns

...

Дополнительные атрибуты выполнения теста

Поскольку мы хотим отдавать пользователю по возможности точное указание того, где произошла ошибка и почему, нам необходим способ сохранять эту информацию в контексте выполнения теста.

Для этого в pytest предусмотрена стандартная фикстура record_property. Она позволяет сохранять в контексте результата теста пары ключ-значение.

# test_file.py
...

RE_KEY = re.compile('data\d+')


@pytest.mark.dependency(depends=["test_data_exists"])
def test_key_value_is_correct(config_record, record_property):
    rec, line_no = config_record
    column_name = KEY_FIELD

    record_property("line_no", line_no)
    record_property("field_name", column_name)

    val = rec[column_name]
    record_property("ref", f"{val}")

    # validation goes here
    assert RE_KEY.fullmatch(val)

В последствие их можно получить и обработать.

# validator.py

...

def run_test(file_name: str, file_type: ConfigType):
    ...
    # Test runn happens here. collector object holds results.
    ...
  
    failed_tests = []
    for report in collector.reports:
        if 'failed' in report.outcome:
            
            # Properties from record_property
            props = {r[0]: r[1] for r in report.user_properties}
            
            tst = TestResult(id=report.nodeid,
                             outcome=report.outcome,
                             rule="".join(report.head_line.split('test_')[-1].split('[')[0]),
                             
                             # Use properties to enrich response on error
                             line_no=props.get('line_no', -1),
                             field_name=str(props.get('field_name', '')),
                             ref=str(props.get('ref', '')))
            failed_tests.append(tst)

Финальный результат выдаваемый сервисом, таким образом, может содержать предельно точные указания что и где в файле не так:

{
  "file_name": "example_type_1.xlsx",
  "result": 1,
  "passed_cnt": 12,
  "failed_cnt": 1,
  "xfailed_cnt": 0,
  "skipped_cnt": 0,
  "total_duration_sec": 1.0997262001037598,
  "failed_tests": [
    {
      "id": "file-type-1/test_file.py::test_mandatory_value_is_missing[2-TextData]",
      "outcome": "failed",
      "rule": "mandatory_value_is_missing",
      "line_no": 4,
      "field_name": "TextData",
      "ref": ""
    }
  ],
  "version": "demo"
}

Заключение

Заставить pytest работать в "непривычных" условиях оказалось возможно, котя и потребовало нескольких итераций.

Плюсы решения:

  • возможность использовать мощную инфраструктуры pytest и опыт инженеров-тестировщиков для решения задачи проверки качества данных в файлах.

  • возможность через тесты определить и задокументировать правила проверки

Что еще можно улучшить

  • Большие накладные расходы на запуск теста в подпроцессе и собственно сборка тестов приводят к длительной проверке файла - в зависимости от сложности и количества тестов проверка может длится минуты. Это не очень критично в случае пакетного процесса - но не очень приятно.

  • В попытке сократить общее время проверки при большом количестве тестов пробовал использовать pytest-xdist для параллельного выполнения тестов, но, к сожалению не пока без результата - имя проверяемого файла не видно в параллельных подпроцессах.

Полная версия сервиса, рассмотренного в статье, с примерами фалов и соответствующих тестов выложена в репозиторий на GitHub.
От полноценно работающего сервиса эту версию отделяют только реальные тесты и включение авторизации.

Комментарии (7)


  1. conopus
    03.12.2023 08:25

    Порадовали подробности кухни pytest. Не всякий копнет так глубоко! Но, возможно стоило поискать специализированные фреймворки по качеству данных. Это только мое предположение. Сам я к этой области отношения не имею.


    1. serhit Автор
      03.12.2023 08:25

      Честно говоря, мы не использовали фреймворки по качеству данных.
      Более того в нашей постановке задачи до работы таких фреймворков мог не дойти ход, если пользователь подсунет файл, "покореженный креативом", в котором колонки и листы переименованы "потому, что так удобнее".


  1. dolfinus
    03.12.2023 08:25
    +1

    Смущает запуск синхронного validator.run_test внутри асинхронного endpoint, это заблокирует event loop


    1. serhit Автор
      03.12.2023 08:25

      Мда, все так... Надо копать глубже. Возможно найду как запустит внешний процесс и отдать управление обратно в event_loop. Беглый поиск результатов пока не дал.
      Если вдруг есть идеи как это сделать - поделитесь.


    1. serhit Автор
      03.12.2023 08:25

      Да, нашлось решение - получилось даже проще, чем изначально. Статью дополнил, репо обновил. Спасибо за комментарий!


  1. SkiffCMC
    03.12.2023 08:25

    Не питонист, но идея заюзать для валидации данный тестовый фреймворк крутая, пасиб!


    1. serhit Автор
      03.12.2023 08:25

      Надо сказать, что тут даже не совсем "валидация данных", а сначала "валидация структуры", потом только - данных. Если говорить про валидацию "структурно чистых" данных - возможно, мы бы смотрели на какие-то другие фреймворки, как предлагал комментатор выше.