В рамках проекта цифровой модернизации для одного из наших клиентов возникла задача миграции данных из одной модели хранения в другую. Для тестирования такого решения мы обратились к практикам BDD (Behaviour Driven Development) и виртуализации зависимостей с помощью docker контейнеров. В данном посте рассмотрен рецепт как можно организовать тестирование подобного решения с помощью pytest-bdd и testcontainers на python. Весь исходный код доступен по ссылке.
Об инфраструктуре
Как видно из диаграммы выше, источник данных живет в Azure. Миграция подразумевает перенос (копирование) актуальных данных из базы в Google Cloud сервис Datastore. API развертывается через Cloud Build как Cloud Run сервис.
Что мигрируем?
В ходе анализа схемы мы выделили ряд доменов, которые подлежат миграции в новое хранилище. Каждый домен представлен набором таблиц и не всегда данные должны оказаться в целевом хранилище целиком: порой это целые таблицы, а иногда определённая выборка атрибутов из неё.
В нашем примере будет использоваться домен Organisations. Информация об организациях разбросана по следующим таблицам:
Orgs (Org_ID, Org_Name, ...) - общие данные об организациях
DeOrgs (Org_ID, User_ID, DeOrg_TS) - удаленные организации
OrgIPValidate (Org_ID) - включена ли IP аутентификация для организации
OrgIPs (Org_ID, Org_IP) - whitelist IP адресов, связанных с организацией
В Google Datastore всё это будем хранить в одной сущности Organisations. Также, следует отметить, что IP адреса в исходном виде - это целые значения, но в Datastore они будут храниться в формате IP-адреса - A.B.C.D.
Можно оценить, что разработка такого решения сводится к написанию SQL запросов различного уровня сложности, каких-то дополнительных преобразований (случай IP-адреса) и корректному отображению данных.
Каким образом это можно тестировать?
В качестве одного из вариантов можно было бы рассмотреть покрытие миграции smoke тестами. Для этого пришлось бы на каждое изменение SQL запроса или другой логики развёртывать новую версию API, прогонять эти тесты, и, в случае падения, искать причины в логах. От инфраструктуры клиента в Azure (VPN + MSSQL) можно было бы отвязаться использованием Google Cloud SQL. Но, в любом случае, этот feedback loop не то, чего нам хотелось бы в силу его полной зависимости от Cloud-провайдера, что несёт за собой:
временные затраты на развёртывание
дополнительные ресурсы на поддержку инфраструктурного кода
ручное сканирование лога на предмет причины падения тестов
Проблему зависимости от Cloud можно решить, развернув setup локально. По сути, у нас здесь два ключевых компонента - это MSSQL и Google Datastore. Давайте рассмотрим как запустить эти вещи локально.
Локальный setup
На помощь приходит библиотека testcontainers. Это небольшая библиотека, позволяющая запускать docker контейнеры для тестирования. В состав уже входит набор некоторых готовых классов-обёрток для популярных решений. А так же имеется удобный API для собственных нужд. Взглянем на следующий код:
@fixture(scope="session", autouse=True)
def mssql_connection() -> Connection:
with SqlServerContainer(
"mcr.microsoft.com/mssql/server:2017-latest"
) as mssql:
engine = create_engine(mssql.get_connection_url())
db.schema.metadata.create_all(engine)
yield engine.connect()
Здесь, мы подготавливаем контейнер с MSSQL. Обратим внимание, что для fixture используется scope=session. По умолчанию, pytest использует scope=function, что предполагает вызов функции на каждый тест. Это можно было бы использовать, но запускать контейнер под каждый тест весьма расточительно. Хотя, в этом случае, можно не беспокоиться об изоляции тестовых данных друг от друга, т.к. каждый раз у нас будет свежий контейнер без данных. Но так как мы заинтересованы в том, чтобы сократить время выполнения всех тестов (а чем их больше, тем это чувствительнее), то будем использовать session. В результате контейнер будет создан лишь один раз перед запуском всех тестов. Но в таком случае возникает проблема изоляции данных. Оставлять данные после отработки теста нам не интересно, поэтому решать эту проблему будем удалением - запомним таблицы, которые заполнялись перед запуском теста и очистим после.
Для Datastore контейнера напишем свой класс. Отмечу, что Google предоставляет набор эмуляторов для некоторых своих сервисов. В список поддерживаемых сервисов входит и Datastore. Наш класс базируется на image от Google со всеми необходимыми зависимостями. Подготовим контейнер:
@fixture(scope="session", autouse=True)
def gds_client():
with DatastoreContainer() as ds:
yield ds.get_client()
Таким образом, перед запуском тестов у нас есть два готовых к работе контейнера. Можно приступать к тестам.
Тестирование
Рассмотрим подход к написанию тестов. BDD - это эволюция практики TDD, где вводится новый концепт behaviour. Мы не будем в деталях останавливаться здесь на особенностях этой практики, отметим лишь два важных момента:
мы по-прежнему остаёмся в рамках парадигмы Test First
тесты становятся своего рода документацией, к которой могут обращаться и не технические специалисты (в нашем проекте это было важно)
Два зайца одним выстрелом: тестируем код и пишем документацию.
Подход к реализации behaviours будет зависеть от выбора конкретной библиотеки. В целом, эти подходы похожи. Наш выбор упал на плагин для pytest который реализует подмножество языка Gherkin - pytest-bdd. Этот плагин позволяет описывать тесты на human-friendly языке с помощью зарезервированных синтаксических конструкций Given/When/Then и использовать pytest runner для запуска.
Тестирование нашей миграции подразумевает следующее:
наличие определенного набора данных в MSSQL (Given data in MSSQL)
непосредственный запуск миграции (When migration triggered)
валидация новых данных в Datastore (Then check data exists in Datastore)
Рассмотрим наш feature-файл с тестами:
Feature: Organisations migration path
Scenario: Test that a selected organisation can be migrated with ip addresses
Given Table Orgs has {"Org_ID": 45, "Org_Name": "Umbrella" }
And Table OrgIPValidate has { "Org_ID": 45 }
And Table OrgIPs has { "Org_ID": 45, "Org_IP": 2128374872 }
And Table OrgIPs has { "Org_ID": 45, "Org_IP": -2128374872 }
When the organisations migration is triggered for Org_ID=45
Then Organisation migrated as { "name": "Umbrella", "legacy_id": 45, "active": true, "ip_addresses": ["168.147.35.129", "88.108.220.126"], "ip_auth_enabled": true }
And 1 organisation migrated
Scenario: Test that a selected deleted organisation can be migrated
Given Table Orgs has {"Org_ID": 47, "Org_Name": "Globex" }
And Table DeOrgs has { "Org_ID": 47 }
When the organisations migration is triggered for Org_ID=47
Then Organisation migrated as { "name": "Globex", "legacy_id": 47, "active": false, "ip_addresses": null, "ip_auth_enabled": false }
And 1 organisation migrated
Здесь у нас два сценария:
В первом сценарии мы мигрируем организацию для которой включена IP-аутентификация и в whitelist добавлено два IP-адреса. Эти данные разбросаны по трём разным таблицам: Orgs, OrgIPValidate, OrgIPs. В конструкции Given мы указываем имя таблицы и используем JSON формат для описания данных. Этого достаточно для того, чтобы вставить запись в нужную таблицу:
@given(parsers.parse("table {table_name} has {payload}"))
def table_insert(
mssql_connection: Connection, context: Any, table_name: str, payload: str
) -> None:
obj = json.loads(payload)
mssql_connection.execute(table_refs[table_name].insert().values(obj))
Заполняем таблицы и запускаем миграцию. После чего, ожидаем создание сущности в Datastore - для этого необходимо указать содержимое сущности, для описания которого снова используется JSON:
@then(parsers.parse("organisation migrated as {payload}"))
def check_organisation_as(gds_client: datastore.Client, payload: str) -> None:
expected = json.loads(payload)
q = gds_client.query(
kind="Organisations", filters=[("legacy_id", "=", expected["legacy_id"])]
)
org = list(q.fetch())[0]
assert dict(org.items()) == expected
Как было отмечено выше, мы хотим зачищать данные после выполнения каждого теста. Для этого можно воспользоваться механизмом hooks которые добавлены в pytest-bdd - pytest_bdd_after_scenario, или добавить финальную фразу непосредственно в сценарий. Дело вкуса. Мы выбрали второй вариант, где перед зачисткой дополнительно проверяем количество перенесённых сущностей. В примере используется одна, но в реальности может быть и набор из нескольких, поэтому финальная конструкция параметризована количеством сущностей:
@then(parsers.parse("{count:d} organisations migrated"))
@then(parsers.parse("{count:d} organisation migrated"))
def check_organisations(
mssql_connection: Connection,
gds_client: datastore.Client,
context: Any,
count: int,
) -> None:
query = gds_client.query(kind="Organisations")
res = list(query.fetch())
assert len(res) == count
cleanup_sql(mssql_connection, context)
cleanup_datastore()
Во втором сценарии мигрируем организацию которая была удалена. Это фиксируется в другой таблице - DeOrgs. В перенесённой сущности это контролируется через свойство active.
Итоги
Этот подход позволяет нам эффективно тестировать миграции данных, при этом, сопровождая сервисы необходимой документацией. В конечном итоге мы избавились от зависимости к Cloud-провайдеру, что существенно снизит временные затраты на запуск тестов и сэкономит ресурсы. Надеюсь, статья окажется полезной при решении похожих задач.