Эта статья родилась из опыта использования GraphQL в проекте одного из крупнейших аэропортов РФ. Проект посвящен разработке системы по автоматизации обслуживания рейсов и управлению ресурсами аэропорта в реальном времени (MRMS система).
Проект реализован на базе микросервисной архитектуры, где модель данных аэропорта представлена в виде GraphQL API, а сервер, предоставляющий API, написан на java. Клиентами этого API являются не только web/mobile, но и сервисы на java, golang и python.
Статья написана как пошаговое руководство по созданию своего GraphQL клиента на python с нуля, где автор демонстрирует проблемы, возникающие на этом пути. Использовать реальную GraphQL схему аэропорта не представляется возможным, поэтому для наглядности будем использовать открытую схему github GraphQL API.
Содержание
Что нас ждет
Вот некоторые из задач, которые мы рассмотрим
как совершить GraphQL запрос и получить ответ от сервера?
асинхронные запросы к серверу;
как управлять своей кодовой базой запросов?
хранение ответов от сервера в виде типизированных классов;
Упоминание новых инструментов будет всегда сопровождаться ссылкой на оф. документацию. Но для комфортного чтения статьи желательно знать основы GraphQL. Так же будет плюсом иметь опыт работы с библиотеками requests, asyncio и aiohttp.
Исходный код из статьи можно найти тут. Код написан на python 3.11 с использованием poetry, типизирован и отформатирован с помощью black.
Подготовка
Для начала подготовимся к работе с github GraphQL API. Во-первых, настроим наше python окружение. Будем использовать poetry и следующую начальную структуру файлов
├── github_graphql_client/ <- Тут будет код нашего клиента
│ └── __init__.py
├── tests/ <- Тут будут тесты
│ └── __init__.py
├── scripts/ <- Тут будут различные скрипты для запуска
├── pyproject.toml <- Файл с настройками проекта
├── README.md
├── .env <- Тут будут всякие sensitive переменные
└── .gitignore
Конфиг pyproject.toml следующий
# `pyproject.toml` file
[tool.poetry]
name = "github_graphql_client"
version = "0.1.0"
description = "Github GraphQL client for Habr."
authors = ["FirstName SecondName <email>"]
readme = "README.md"
packages = [{include = "github_graphql_client"}]
[tool.poetry.dependencies]
python = "^3.11"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Во-вторых, Github GraphQL endpoint находится по адресу https://api.github.com/graphql. Чтобы воспользоваться API — нужно получить токен. Подробности про выпуск токена описаны тут: Github - Forming calls with GraphQL. В файле .env перечислим следующие параметры
GITHUB_TOKEN=<YOUR_TOKEN>
GITHUB_GRAPHQL_ENDPOINT=https://api.github.com/graphql
Для работы с файлом .env будем использовать пакет python-dotenv. Добавим его в качестве нашей первой зависимости
$ poetry add python-dotenv
Creating virtualenv blah-blah-py3.11 in /home/blah-blah/pypoetry/virtualenvs
Using version ^1.0.0 for python-dotenv
Updating dependencies
Resolving dependencies... (0.1s)
Package operations: 1 install, 0 updates, 0 removals
• Installing python-dotenv (1.0.0)
Writing lock file
После выполнения команды poetry add появится файл poetry.lock, а так же наша зависимость будет добавлена в pyproject.toml
# `pyproject.toml` file
# ...
[tool.poetry.dependencies]
python = "^3.11"
python-dotenv = "^1.0.0"
# ...
И, наконец, запустим poetry install
для установки всех зависимостей.
Первый запрос
Основное, что должен уметь наш клиент — подключаться к серверу. Обычно GraphQL работает по протоколу HTTP/HTTPS через единственный POST, который ожидает на входе json с полями query: str
и variables: dict[str, Any]
. Так это работает и в github GraphQL API. Схему github GraphQL API можно посмотреть тут. Для начала спроектируем класс, который будет отвечать непосредственно за соединение с сервером и получение данных. Добавим несколько новых файлов
$ mkdir github_graphql_client/transport
$ touch github_graphql_client/transport/__init__.py github_graphql_client/transport/base.py
В файле github_graphql_client/transport/base.py определим BaseTransport
# `github_graphql_client/transport/base.py` file
from typing import Any
class BaseTransport:
"""An abstract transport."""
def execute(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
"""Execute GraphQL query."""
raise NotImplementedError
def connect(self) -> None:
"""Establish a session with the transport."""
raise NotImplementedError
def close(self) -> None:
"""Close a session."""
raise NotImplementedError
Любой класс типа Transport должен наследоваться от BaseTransport. Для этого необходимо реализовать три метода
connect — метод для открытия соединения с сервером;
close — метод для закрытия соединения;
execute — метод для выполнения запроса query с переменными variables;
Так, как GraphQL запрос есть обычный POST запрос — клиент может быть реализован с помощью пакета requests. Для начала добавим зависимость
$ poetry add requests
В новый файл github_graphql_client/transport/requests.py добавим следующий код
# `github_graphql_client/transport/requests.py` file
from typing import Any, Optional
import requests as r
from github_graphql_client.transport.base import BaseTransport
class RequestsTransport(BaseTransport):
"""The transport based on requests library."""
session: Optional[r.Session]
def __init__(self, endpoint: str, token: str, **kwargs: Any) -> None:
self.endpoint = endpoint
self.token = token
self.auth_header = {"Authorization": f"Bearer {self.token}"}
self.session = None
def connect(self) -> None:
"""Start a `requests.Session` connection."""
if self.session is None:
self.session = r.Session()
else:
raise Exception("Session already started")
def close(self) -> None:
"""Closing `requests.Session` connection."""
if self.session is not None:
self.session.close()
self.session = None
def execute(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
"""Execute GraphQL query."""
if self.session is None:
raise Exception(f"RequestsTransport session not connected")
post_args = {
"headers": self.auth_header,
"json": {"query": query, "variables": variables},
}
post_args["headers"]["Content-Type"] = "application/json"
response = self.session.request("POST", self.endpoint, **post_args)
result = response.json()
return result.get("data")
В данной реализации
метод connect создает объект requests.Session если он не был создан ранее;
метод close закрывает соединение для объекта requests.Session;
метод execute отправляет с помощью объекта requests.Session обычный POST запрос;
Давайте проверим, что с помощью этого класса мы уже можем получить данные. Отправим query, который должен вернуть несколько первых завершенных issues из выбранного github репозитория. Для примера, рассмотрим новый проект автора pydantic - FastUI. Хранить запросы будем в отдельной папке
$ mkdir github_graphql_client/queries
$ touch github_graphql_client/queries/__init__.py github_graphql_client/queries/repository.py
# `github_graphql_client/queries/repository.py` file
repository_issues_query = """
query {
repository(owner:"pydantic", name:"FastUI") {
issues(last:2, states:CLOSED) {
edges {
node {
title
url
}
}
}
}
}
"""
Для удобства добавим скрипт scripts/run.py
# `scripts/run.py` file
import os
import time
from typing import Any
from dotenv import load_dotenv
from github_graphql_client.client.requests_client import RequestsClient
from github_graphql_client.queries.repository import repository_issues_query
load_dotenv() # take environment variables from .env
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
GITHUB_GRAPHQL_ENDPOINT = os.environ.get("GITHUB_GRAPHQL_ENDPOINT")
def main():
transport = RequestsTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT, token=GITHUB_TOKEN
)
transport.connect()
data = transport.execute(
query=repository_issues_query, variables={},
)
print(data)
transport.close()
if __name__ == "__main__":
main()
После запуска скрипта мы получим примерно следующее
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
Проект активно живет, поэтому сейчас issues будут другими.
Таймаут
Хорошо, если вы понимаете, сколько должен выполняться ваш запрос. Процитируем одну известную статью
Не ставить таймаут на задачу — это зло. Это значит, что вы не понимаете, что происходит в задаче, как должна работать бизнес-логика.
Добавим таймаут в RequestsTransport
# `github_graphql_client/transport/requests.py` file
...
class RequestsTransport(BaseTransport):
"""The transport based on requests library."""
DEFAULT_TIMEOUT: int = 1
session: Optional[r.Session]
def __init__(self, endpoint: str, token: str, **kwargs: Any) -> None:
...
self.timeout = kwargs.get("timeout", RequestsTransport.DEFAULT_TIMEOUT)
...
def execute(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
...
post_args = {
"headers": self.auth_header,
"json": {"query": query, "variables": variables},
"timeout": self.timeout,
}
...
Проверим наш код указав очень маленький (для github GraphQL API) таймаут
# `scripts/run.py` file
...
def main():
transport = RequestsTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT,
token=GITHUB_TOKEN,
timeout=0.0001,
)
...
...
Запустив scripts/run.py, мы получим следующее
$ python3 scripts/run.py
...
requests.exceptions.ConnectTimeout: HTTPSConnectionPool(host='api.github.com', port=443): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f5510137d50>, 'Connection to api.github.com timed out. (connect timeout=0.0001)'))
По аналогии вы можете добавить свои настройки для requests, а мы пойдем дальше.
Синхронный клиент
Создадим клиент, который научим работать с BaseTransport. После выполнения команд
$ mkdir github_graphql_client/client
$ touch github_graphql_client/client/__init__.py github_graphql_client/client/sync_client.py
в файл github_graphql_client/client/sync_client.py добавим следующий код
# `github_graphql_client/client/sync_client.py` file
from typing import Any
from github_graphql_client.transport.base import BaseTransport
class SyncGraphQLClient:
"""Sync GraphQL client based on `BaseTransport` transport."""
transport: BaseTransport
def __init__(self, transport: BaseTransport) -> None:
self.transport = transport
def __enter__(self):
self.connect_sync()
return self
def __exit__(self, *args):
self.close_sync()
def connect_sync(self) -> None:
"""Connect to `self.transport`."""
self.transport.connect()
def close_sync(self) -> None:
"""Close `self.transport` connection."""
self.transport.close()
def execute_sync(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
return self.transport.execute(query, variables, **kwargs)
Класс SyncGraphQLClient умеет запускать сессию для произвольного класса BaseTransport и получать результаты запросов опуская детали реализации самого запроса.
метод connect_sync создает соединение через self.transport;
метод close_sync закрывает соединение через self.transport;
метод execute_sync получает данные с сервера через self.transport;
методы __enter__ и __exit__ предназначены для того, чтобы запускать клиент в контекстном менеджере и не забывать закрыть соединение;
Проверим, что это работает
# `scripts/run.py` file
...
from github_graphql_client.client.sync_client import SyncGraphQLClient
...
def main():
transport = RequestsTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT,
token=GITHUB_TOKEN,
)
with SyncGraphQLClient(transport=transport) as client:
data = client.execute_sync(
query=repository_issues_query, variables={},
)
print(data)
...
Запустив scripts/run.py, получим тот же результат
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
Слово sync в названии SyncGraphQLClient не случайно. Немного позже у нас появится AsyncGraphQLClient, и мы объединим их в один GraphQLClient.
Несколько запросов
Давайте внутри одной сессии выполним несколько GraphQL запросов. Для этого изменим наш запрос так, чтобы иметь возможность указать название желаемого репозитория
# `github_graphql_client/queries/repository.py` file
def get_repository_issues_query(owner: str, name: str) -> str:
return """query {
repository(owner:"%s", name:"%s") {
issues(last:2, states:CLOSED) {
edges {
node {
title
url
}
}
}
}
}
""" % (
owner,
name,
)
Наш скрипт scripts/run.py будет выглядеть следующим образом
# `scripts/run.py` file
...
def main():
transport = RequestsTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT,
token=GITHUB_TOKEN,
)
with SyncGraphQLClient(transport=transport) as client:
data = client.execute_sync(
query=get_repository_issues_query("pydantic", "FastUI"),
variables={},
)
print(data)
data = client.execute_sync(
query=get_repository_issues_query("pydantic", "pydantic"),
variables={},
)
print(data)
data = client.execute_sync(
query=get_repository_issues_query("pydantic", "pydantic-core"),
variables={},
)
print(data)
...
После запуска получим примерно следующее
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
{'repository': {'issues': {'edges': [{'node': {'title': "__init__.cpython-311-darwin.so is an incompatible architecture (have 'x86_64', need 'arm64') in M1 mac mini", 'url': 'https://github.com/pydantic/pydantic/issues/8396'}}, {'node': {'title': 'Override class used in annotations', 'url': 'https://github.com/pydantic/pydantic/issues/8408'}}]}}}
{'repository': {'issues': {'edges': [{'node': {'title': '2.14.4 release upload failed', 'url': 'https://github.com/pydantic/pydantic-core/issues/1082'}}, {'node': {'title': "(????) `ValidationError` can't be instantiated", 'url': 'https://github.com/pydantic/pydantic-core/issues/1115'}}]}}}
Асинхронный клиент
В предыдущем примере мы ожидали ответ на первый запрос и только после этого отправляли второй запрос. Зачем нам тратить столько времени? Давайте реализуем асинхронное соединение.
В файле github_graphql_client/transport/base.py добавим класс BaseAsyncTransport по аналогии с BaseTransport
# `github_graphql_client/transport/base.py` file
...
class BaseAsyncTransport:
"""An abstract async transport."""
async def execute(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
"""Execute GraphQL query."""
raise NotImplementedError
async def connect(self) -> None:
"""Establish a session with the transport."""
raise NotImplementedError
async def close(self) -> None:
"""Close a session."""
raise NotImplementedError
Для примера реализуем асинхронный backend на aiohttp. Добавим этот пакет в проект
$ poetry add aiohttp
В новом файле github_graphql_client/transport/aiohttp.py создадим класс AIOHTTPTransport
# `github_graphql_client/transport/aiohttp.py` file
from typing import Any, Optional
import aiohttp
from github_graphql_client.transport.base import BaseAsyncTransport
class AIOHTTPTransport(BaseAsyncTransport):
"""The transport based on aiohttp library."""
DEFAULT_TIMEOUT = 1
session: Optional[aiohttp.ClientSession]
def __init__(self, endpoint: str, token: str, **kwargs: Any) -> None:
self.endpoint = endpoint
self.token = token
self.auth_header = {"Authorization": f"Bearer {self.token}"}
self.timeout = kwargs.get("timeout", AIOHTTPTransport.DEFAULT_TIMEOUT)
self.session = None
async def connect(self) -> None:
"""Coroutine which will create an aiohttp ClientSession() as self.session."""
if self.session is None:
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout),
headers=self.auth_header,
)
else:
raise Exception(f"AIOHTTPTransport is already connected")
async def close(self) -> None:
"""Coroutine which will close the aiohttp session."""
if self.session is not None:
await self.session.close()
self.session = None
async def execute(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
"""Execute GraphQL query with aiohttp."""
if self.session is None:
raise Exception(f"AIOHTTPTransport session not connected")
async with self.session.post(
self.endpoint,
json={"query": query, "variables": variables},
) as response:
data = await response.json()
return data.get("data")
Тут все очень похоже на RequestsTransport
connect — это корутина, которая создает сессию с помощью aiohttp.ClientSession;
close — корутина, которая завершает aiohttp.ClientSession;
execute — корутина, которая с помощью открытой aiohttp.ClientSession асинхронно получает данные с сервера;
для правильной установки таймаута используем aiohttp.ClientTimeout;
Проверим асинхронную работу AIOHTTPTransport с помощью asyncio
# `scripts/run.py` file
import asyncio
...
async def run_aiohttp_transport():
transport = AIOHTTPTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT,
token=GITHUB_TOKEN,
)
await transport.connect()
data = await transport.execute(
query=get_repository_issues_query("pydantic", "FastUI"),
variables={},
)
print(data)
await transport.close()
def amain():
asyncio.run(run_aiohttp_transport())
if __name__ == "__main__":
amain()
Результат нам уже знаком
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
Создадим теперь асинхронный клиент для работы с BaseAsyncTransport
# `github_graphql_client/client/async_client.py` file`
from typing import Any
from github_graphql_client.transport.base import BaseAsyncTransport
class AsyncGraphQLClient:
"""Async GraphQL client based on `BaseAsyncTransport` transport."""
transport: BaseAsyncTransport
def __init__(self, transport: BaseAsyncTransport) -> None:
self.transport = transport
async def __aenter__(self):
await self.connect_async()
return self
async def __aexit__(self, *args):
await self.close_async()
async def connect_async(self) -> None:
"""Connect to `self.transport`."""
await self.transport.connect()
async def close_async(self) -> None:
"""Close `self.transport` connection."""
await self.transport.close()
async def execute_async(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
return await self.transport.execute(query, variables, **kwargs)
Тут все почти так же, как в SyncGraphQLClient, но асинхронно. Проверим
# `scripts/run.py` file
...
async def amain():
transport = AIOHTTPTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT,
token=GITHUB_TOKEN,
)
async with AsyncGraphQLClient(transport=transport) as client:
data = await client.execute_async(
query=get_repository_issues_query("pydantic", "FastUI", last=2),
variables={},
)
print(data)
def main():
asyncio.run(amain())
if __name__ == "__main__":
main()
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
Теперь перепишем последний пример предыдущего раздела с помощью нового асинхронного клиента
# `scripts/run.py` file
...
def check_execute(fn):
async def wrapper(*args, **kwargs):
tic = time.perf_counter()
result = await fn(*args, **kwargs)
toc = time.perf_counter()
print(f"Duration for {fn.__name__} is {toc - tic:0.4f} seconds")
return result
return wrapper
@check_execute
async def execute(client, query: str, variables: dict[str, Any]) -> None:
data = await client.execute_async(query, variables)
print(data)
@check_execute
async def amain():
transport = AIOHTTPTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT,
token=GITHUB_TOKEN,
)
async with AsyncGraphQLClient(transport=transport) as client:
async with asyncio.TaskGroup() as tg:
tg.create_task(
execute(
client,
query=get_repository_issues_query(
"pydantic", "FastUI",
),
variables={},
)
)
tg.create_task(
execute(
client,
query=get_repository_issues_query(
"pydantic", "pydantic",
),
variables={},
)
)
tg.create_task(
execute(
client,
query=get_repository_issues_query(
"pydantic", "pydantic-core",
),
variables={},
)
)
def main():
asyncio.run(amain())
...
В контекстном менеджере клиента AsyncGraphQLClient мы асинхронно запускаем несколько задач execute. Это реализовано с помощью asyncio.TaskGroup. Декоратор check_execute выводит для нас время выполнения
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
Duration for execute is 0.5999 seconds
{'repository': {'issues': {'edges': [{'node': {'title': '2.14.4 release upload failed', 'url': 'https://github.com/pydantic/pydantic-core/issues/1082'}}, {'node': {'title': "(????) `ValidationError` can't be instantiated", 'url': 'https://github.com/pydantic/pydantic-core/issues/1115'}}]}}}
Duration for execute is 0.5987 seconds
{'repository': {'issues': {'edges': [{'node': {'title': "__init__.cpython-311-darwin.so is an incompatible architecture (have 'x86_64', need 'arm64') in M1 mac mini", 'url': 'https://github.com/pydantic/pydantic/issues/8396'}}, {'node': {'title': 'Override class used in annotations', 'url': 'https://github.com/pydantic/pydantic/issues/8408'}}]}}}
Duration for execute is 0.5991 seconds
Duration for amain is 0.6005 seconds
Теперь мы получили, что время выполнения функции amain почти равно времени выполнения самого длительного запроса.
Создаем клиент
Пора объединить SyncGraphQLClient и AsyncGraphQLClient в один GraphQLClient, чтобы использовать один интерфейс. Создадим файл github_graphql_client/client/client.py
# `github_graphql_client/client/client.py` file
import asyncio
from typing import Any, Union
from github_graphql_client.transport.base import (
BaseAsyncTransport,
BaseTransport,
)
from .async_client import AsyncGraphQLClient
from .sync_client import SyncGraphQLClient
class GraphQLClient(SyncGraphQLClient, AsyncGraphQLClient):
"""GraphQL client."""
transport: Union[BaseTransport, BaseAsyncTransport]
def __init__(
self, transport: Union[BaseTransport, BaseAsyncTransport]
) -> None:
super().__init__(transport)
async def _execute_async(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
async with self as client:
data = await client.execute_async(
query,
variables,
**kwargs,
)
return data
async def _execute_batch_async(
self,
queries: list[str],
variables: list[dict[str, Any]],
**kwargs: Any,
) -> list[dict[str, Any]]:
tasks = []
async with self as client:
for i in range(len(queries)):
query = queries[i]
vars = variables[i]
tasks.append(client.execute_async(query, vars, **kwargs))
result_data = await asyncio.gather(*tasks)
return list(result_data)
def execute(
self, query: str, variables: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
"""Execute GraphQL query."""
if isinstance(self.transport, BaseAsyncTransport):
return asyncio.run(self._execute_async(query, variables, **kwargs))
else:
with self as client:
return client.execute_sync(query, variables, **kwargs)
def execute_batch(
self,
queries: list[str],
variables: list[dict[str, Any]],
**kwargs: Any,
) -> list[dict[str, Any]]:
"""Execute a batch of GraphQL queries."""
if isinstance(self.transport, BaseAsyncTransport):
return asyncio.run(
self._execute_batch_async(queries, variables, **kwargs)
)
else:
results = []
with self as client:
for i in range(len(queries)):
query = queries[i]
vars = variables[i]
res = client.execute_sync(query, vars, **kwargs)
results.append(res)
return results
Два основных метода для использования: execute и execute_batch. Внутри эти методы определяют какой тип у self.transport
, а после вызывают код из наших прошлых примеров. Попробовать все вариант можно с помощью следующего кода
# `scripts/run.py` file
...
def main():
transport_r = RequestsTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT,
token=GITHUB_TOKEN,
)
transport_a = AIOHTTPTransport(
endpoint=GITHUB_GRAPHQL_ENDPOINT,
token=GITHUB_TOKEN,
)
client_r = GraphQLClient(transport=transport_r)
client_a = GraphQLClient(transport=transport_a)
q_1 = get_repository_issues_query(
"pydantic",
"FastUI",
)
q_2 = get_repository_issues_query(
"pydantic",
"pydantic",
)
q_3 = get_repository_issues_query(
"pydantic",
"pydantic-core",
)
data_r1 = client_r.execute(q_1, {})
data_a1 = client_a.execute(q_1, {})
data_r3 = client_r.execute_batch([q_1, q_2, q_3], [{}, {}, {}])
data_a3 = client_a.execute_batch([q_1, q_2, q_3], [{}, {}, {}])
...
Поздравляю, мы только что написали свою упрощенную версию библиотеки gql. gql основана на той же идее, но имеет много дополнительной обвязки
различные реализации backend: requests, async, websockets;
различные проверки, которые мы осознанно опускали;
валидация запросов;
проверка ошибки запроса от сервера;
Пробуем gql
Мы не будем приводить всю документацию gql, но рассмотрим один пример. Выполним наш запрос с помощью gql. Установим пакет следующим образом
$ poetry add "gql[all]"
И запустим пример из документации для нашего запроса
# `scripts/run.py` file
...
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
...
def check_execute(fn):
def wrapper(*args, **kwargs):
tic = time.perf_counter()
result = fn(*args, **kwargs)
toc = time.perf_counter()
print(f"Duration for {fn.__name__} is {toc - tic:0.4f} seconds")
return result
return wrapper
@check_execute
def main():
transport = AIOHTTPTransport(
url=GITHUB_GRAPHQL_ENDPOINT,
headers={"Authorization": f"Bearer {GITHUB_TOKEN}"},
)
client = Client(transport=transport, fetch_schema_from_transport=True)
# Provide a GraphQL query
query = gql(get_repository_issues_query("pydantic", "FastUI", last=2))
result = client.execute(query)
print(result)
...
Результат будет примерно следующим
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
Duration for main is 8.5059 seconds
Ого, 8 секунд! Но запрос тот же самый. Значит время уходит не на ожидание ответа от сервера, а на накладные расходы самой библиотеки. Эти накладные расходы связаны с параметром fetch_schema_from_transport
.
Пакет gql базируется на библиотеке graphql-core, которая является python реализацией проекта GraphQL.js. И если параметр fetch_schema_from_transport
имеет значение True, то во время первого запроса клиент фетчит схему и билдит ее в формат graphql_core.GraphQLSchema.
На момент написания статьи схема github GraphQL API содержит 61602 строки. Т.е. все 8 секунд уходят именно на анализ схемы, которая, конечно, довольно большая. Если fetch_schema_from_transport=False
, то время выполнения запроса будет совпадать с нашим клиентом.
Работа с запросами
Только что мы увидели проблему фетчинга большой схемы. Фетчинг происходит для того, чтобы произвести валидацию запросов на клиенте. Если зафиксировать схему, с которой мы работаем, то валидацию можно перенести на уровень тестов.
Валидация запросов
Установим pytest
$ poetry add pytest --group test
Сохраним GraphQL схему локально в файле tests/data/schema.docs.graphql. Тест для валидации нашего запроса repository будет следующим
# `tests/test_queries_validation.py` file
from pathlib import Path
from graphql import Source, parse, build_schema, validate, GraphQLSchema
from github_graphql_client.queries.repository import get_repository_issues_query
SCHEMA_FILENAME = Path(__file__).parent / Path("data/schema.docs.graphql")
def get_schema() -> GraphQLSchema:
with SCHEMA_FILENAME.open("r", encoding="utf8") as f:
schema_str = f.read()
return build_schema(schema_str)
schema = get_schema()
def test_repository_issues_query():
query = get_repository_issues_query("pydantic", "FastUI")
document = parse(Source(query))
validation_errors = validate(schema, document)
if validation_errors:
raise validation_errors[0]
Что происходит
с помощью библиотеки graphql-core создаем объект
schema: graphql.GraphQLSchema
используя схему в формате строки;парсим запрос с помощью метода graphql.parse;
используем метод
graphql.validate(schema, document)
для валидации запроса;
Если использовать gql, то gql.Client предоставляет готовый метод валидации gql.Client.validate.
Для примера, допустим ошибку в названии поля issues внутри нашего запроса и запустим тест
# Файл `github_graphql_client/queries/repository.py`
repository_issues_query = """
...
repository(owner:"pydantic", name:"FastUI") {
issuess(last:2, states:CLOSED) {
...
"""
Тест скажет нам примерно следующее
document = DocumentNode at 0:183
def validate(self, document: DocumentNode):
""":meta private:"""
assert (
self.schema
), "Cannot validate the document locally, you need to pass a schema."
validation_errors = validate(self.schema, document)
if validation_errors:
> raise validation_errors[0]
E graphql.error.graphql_error.GraphQLError: Cannot query field 'issuess' on type 'Repository'. Did you mean 'issues' or 'issue'?
E
E GraphQL request:4:5
E 3 | repository(owner:"pydantic", name:"FastUI") {
E 4 | issuess(last:2, states:CLOSED) {
E | ^
E 5 | edges {
GraphQL variables
Мы еще ни разу не использовали GraphQL variables. Рассмотрим следующий запрос marketplaceCategories к github GraphQL API
"""
Get alphabetically sorted list of Marketplace categories
"""
marketplaceCategories(
"""
Exclude categories with no listings.
"""
excludeEmpty: Boolean
"""
Returns top level categories only, excluding any subcategories.
"""
excludeSubcategories: Boolean
"""
Return only the specified categories.
"""
includeCategories: [String!]
): [MarketplaceCategory!]!
Добавим его в github_graphql_client/queries/
# `github_graphql_client/queries/marketplaceCategories.py` file
def get_marketplace_categories(
exclude_empty: bool,
exclude_subcategories: bool,
include_categories: list[str],
) -> str:
return """query {
marketplaceCategories(excludeEmpty: %s, excludeSubcategories: %s, includeCategories: %s) {
id
description
}
}""" % (
str(exclude_empty).lower(),
str(exclude_subcategories).lower(),
str(include_categories).replace("'", '"'),
)
Чтобы создать корректную строку с GraphQL запросом с помощью форматирования строк, нам необходимо
преобразовать булево значение True/False в строку true/false;
преобразовать массив строк ['1', '2', '3'] в строку '["1", "2", "3"]';
Бывают и более сложные кейсы, когда нам нужен массив enum значений. Т.е. массив ["A", "B"] необходимо преобразовать в строку '[A, B]'. Чтобы этого избежать эти преобразования и нужны GraphQL variables
# `github_graphql_client/queries/marketplaceCategories.py` file
from typing import Any
def get_marketplace_categories(
exclude_empty: bool,
exclude_subcategories: bool,
include_categories: list[str],
) -> tuple[str, dict[str, Any]]:
variables = {
"excludeEmpty": exclude_empty,
"excludeSubcategories": exclude_subcategories,
"includeCategories": include_categories,
}
query = """query(
$excludeEmpty: Boolean
$excludeSubcategories: Boolean
$includeCategories: [String!]
) {
marketplaceCategories(
excludeEmpty: $excludeEmpty,
excludeSubcategories: $excludeSubcategories,
includeCategories: $includeCategories
) {
id
description
}
}"""
return query, variables
Теперь наша функция get_marketplace_categories возвращает строку с запросом и словарь variables. Не забудем добавить тест
# `tests/test_queries_validation.py` file
...
from github_graphql_client.queries.marketplaceCategories import \
get_marketplace_categories
...
def test_get_marketplace_categories():
query, _ = get_marketplace_categories(True, False, ["1", "2", "3"])
document = parse(Source(query))
validation_errors = validate(schema, document)
if validation_errors:
raise validation_errors[0]
Пакет graphql-query
Какие еще могут возникнуть проблему при управлении GraphQL запросами
шарить повторяющиеся куски запроса между разными запросами;
использовать фрагменты и инлайн-фрагменты;
использовать несколько GraphQL запросов внутри одного;
использовать алиасы для имени запроса;
Чем больше становится объем запросов в нашем проекте, тем сложнее всем этим управлять. По этим причинам была разработана библиотека graphql-query. Она предоставляет возможность генерации валидных строк с GraphQL запросами используя python классы. Генерация основана на библиотеке jinja2.
Документацию graphql-query можно посмотреть тут: https://denisart.github.io/graphql-query/. Так же есть статья на habr с пересказом документации на русском языке.
Для демонстрации перепишем наши текущие запросы используя graphql-query
$ poetry add graphql-query
Структура того, как устроен запрос
Создаем объект graphql_query.Operation, который есть наш запрос;
Указываем тип для
graphql_query.Operation: query, mutation, subscription
;Указываем имя graphql_query.Operation. Это шаг не обязательный, но является хорошей практикой. Сервер сможет по названию отслеживать метрики этого запроса именно от вашего сервиса;
Добавляем в graphql_query.Operation все необходимые variables в формате graphql_query.Variable;
Сами запросы создаем через graphql_query.Query;
У graphql_query.Query указываем имя, список аргументов как массив graphql_query.Argument и список желаемых полей.
Поля, переменные, аргументы задаются специальными классами. А значит их легко шарить между разными запросами. Если у вас в проекте всего один запрос, то такой подход может усложнить разработку, потому что кода становится больше.
Пример запроса marketplaceCategories в формате graphql-query
# `github_graphql_client/queries/marketplaceCategories.py` file
from typing import Any
from graphql_query import Operation, Argument, Variable, Field, Query
var_exclude_empty = Variable(name="excludeEmpty", type="Boolean")
var_exclude_subcategories = Variable(
name="excludeSubcategories", type="Boolean"
)
var_include_categories = Variable(name="includeCategories", type="[String!]")
def get_marketplace_categories(
exclude_empty: bool,
exclude_subcategories: bool,
include_categories: list[str],
) -> tuple[str, dict[str, Any]]:
marketplace_categories_query = Query(
name="marketplaceCategories",
arguments=[
Argument(name="excludeEmpty", value=var_exclude_empty),
Argument(
name="excludeSubcategories",
value=var_exclude_subcategories,
),
Argument(name="includeCategories", value=var_include_categories),
],
fields=["id", "description"],
)
operation = Operation(
type="query",
name="getMarketplaceCategories",
variables=[
var_exclude_empty,
var_exclude_subcategories,
var_include_categories,
],
queries=[marketplace_categories_query],
)
variables = {
var_exclude_empty.name: exclude_empty,
var_exclude_subcategories.name: exclude_subcategories,
var_include_categories.name: include_categories,
}
return operation.render(), variables
Вызов operation.render()
вернет нам следующий результат
query getMarketplaceCategories(
$excludeEmpty: Boolean
$excludeSubcategories: Boolean
$includeCategories: [String!]
) {
marketplaceCategories(
excludeEmpty: $excludeEmpty
excludeSubcategories: $excludeSubcategories
includeCategories: $includeCategories
) {
id
description
}
}
Не забываем убедиться, что тест test_get_marketplace_categories все так же завершается успешно. Остальные примеры использования (включая graphql_query.Fragment и graphql_query.InlineFragment) можно посмотреть в документации.
Генерация модели данных
Давайте запросим 20 последних открытых issues библиотеки pydantic. Немного обновим наш запрос repository
# `github_graphql_client/queries/repository.py` file
...
var_issue_state = Variable(name="IssueState", type="[IssueState!]")
...
def get_repository_issues_query(
owner: str,
name: str,
last: int,
state: str,
) -> tuple[str, dict[str, Any]]:
...
operation = Operation(
...
variables=[var_owner, var_name, var_last, var_issue_state],
...
)
return operation.render(), {
var_owner.name: owner,
var_name.name: name,
var_last.name: last,
var_issue_state.name: state,
}
Результат запроса get_repository_issues_query("pydantic", "pydantic", 20, "OPEN")
будет примерно следующим
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': "Can't use config keyword argument with TypeAdapter.__init__ on stdlib dataclass", 'url': 'https://github.com/pydantic/pydantic/issues/8326'}}, {'node': {'title': 'Constructor for model with `Json[list[int]]` field should accept `list[int]`, like mypy already expects', 'url': 'https://github.com/pydantic/pydantic/issues/8336'}}, {'node': {'title': 'JSON serialization issue with ipaddress classes as alternative', 'url': 'https://github.com/pydantic/pydantic/issues/8343'}}, {'node': {'title': 'Indeterminate: LookupError when generating custom schema output', 'url': 'https://github.com/pydantic/pydantic/issues/8359'}}, {'node': {'title': 'Implement more clear warning/error when using constraints on compound types', 'url': 'https://github.com/pydantic/pydantic/issues/8362'}}, {'node': {'title': 'Magic validation methods needs to be documented for version 2', 'url': 'https://github.com/pydantic/pydantic/issues/8374'}}, {'node': {'title': 'More consistent and intuitive `alias` behavior for validation and serialization', 'url': 'https://github.com/pydantic/pydantic/issues/8379'}}, {'node': {'title': 'Common mistakes docs section', 'url': 'https://github.com/pydantic/pydantic/issues/8380'}}, {'node': {'title': 'Deprecate `update_json_schema` function', 'url': 'https://github.com/pydantic/pydantic/issues/8381'}}, {'node': {'title': 'coerce_numbers_to_str needs a per-field variant', 'url': 'https://github.com/pydantic/pydantic/issues/8383'}}, {'node': {'title': 'Bus error when using custom type impl with SQLAlchemy ', 'url': 'https://github.com/pydantic/pydantic/issues/8385'}}, {'node': {'title': 'Allow optional properties to be truely optional', 'url': 'https://github.com/pydantic/pydantic/issues/8394'}}, {'node': {'title': '`@property` changes behavior when using dataclass-ish classes', 'url': 'https://github.com/pydantic/pydantic/issues/8401'}}, {'node': {'title': "Regex's pattern is not serialized when creating a model's JSON schema", 'url': 'https://github.com/pydantic/pydantic/issues/8405'}}, {'node': {'title': 'cached_property is not ignored when a model is copied and updated', 'url': 'https://github.com/pydantic/pydantic/issues/8406'}}, {'node': {'title': 'BaseModel causes information loss with Generic classes', 'url': 'https://github.com/pydantic/pydantic/issues/8410'}}, {'node': {'title': "JSON Schema is wrong when `mode='serialization'` and fields have a default", 'url': 'https://github.com/pydantic/pydantic/issues/8413'}}, {'node': {'title': 'Bytes and Bits Conversion Type', 'url': 'https://github.com/pydantic/pydantic/issues/8415'}}, {'node': {'title': 'Debian/ubuntu packages for v2', 'url': 'https://github.com/pydantic/pydantic/issues/8416'}}, {'node': {'title': "Union discriminator tag fails if '_' is in the literal value", 'url': 'https://github.com/pydantic/pydantic/issues/8417'}}]}}}
Пробежимся по issues и принтанем все title. Код будет примерно таким
for issue in result["repository"]["issues"]["edges"]:
print(issue["node"]["title"])
Не очень удобно. Хотелось бы иметь подсказки от IDE и писать что-то типа
for issue in repository.issues.edges:
print(issue.node.title)
Для этого можно создать классы, повторяющие типы из GraphQL схемы. Есть два вопроса
какой базовый класс использовать?
как, имея схему, генерить эти классы?
Базовый класс
Первый вариант — dataclasses.dataclass. Это хорошо, но возникает проблема при десериализации вложенных типов. Альтернативный вариант dataclasses-json. Это дата-классы дополненные методами from_json, from_dict. Некоторые проблемы при таком подходе
необходы свои сериализаторы/десериализаторы для типа union (в новых версиях проблема может быть решена);
преобразование объекта с глубокой вложенность может занимать значительное время;
Современный подход, решающий описанные проблемы — pydantic. Рассмотрим наш пример (не забывая добавить pydantic в проект)
from pydantic import BaseModel
class Issue(BaseModel):
title: str
url: str
class IssueEdge(BaseModel):
node: Issue
class IssueConnection(BaseModel):
edges: list[IssueEdge]
class Repository(BaseModel):
issues: IssueConnection
repository = Repository.model_validate(result["repository"])
for issue in repository.issues.edges:
print(issue.node.title)
Результат
Can't use config keyword argument with TypeAdapter.__init__ on stdlib dataclass
Constructor for model with `Json[list[int]]` field should accept `list[int]`, like mypy already expects
JSON serialization issue with ipaddress classes as alternative
Indeterminate: LookupError when generating custom schema output
Implement more clear warning/error when using constraints on compound types
Magic validation methods needs to be documented for version 2
More consistent and intuitive `alias` behavior for validation and serialization
Common mistakes docs section
Deprecate `update_json_schema` function
coerce_numbers_to_str needs a per-field variant
Bus error when using custom type impl with SQLAlchemy
Allow optional properties to be truely optional
`@property` changes behavior when using dataclass-ish classes
Regex's pattern is not serialized when creating a model's JSON schema
cached_property is not ignored when a model is copied and updated
BaseModel causes information loss with Generic classes
JSON Schema is wrong when `mode='serialization'` and fields have a default
Bytes and Bits Conversion Type
Debian/ubuntu packages for v2
Union discriminator tag fails if '_' is in the literal value
Генерация классов
Чтобы вручную создать все классы, описанные в схеме github GraphQL API, может потребоваться много времени и концентрации. Если же схема часто изменяется (что может быть на начальных этапах разработки) — нужно выполнять много рутинной работы по актуализации. Удобно иметь возможность генерить классы автоматически при изменении схемы.
Описанная проблема привела к созданию библиотеки graphql2python для генерации pydantic модели данных по некоторой GraphQL смехе. Недавно в бета режиме эти наработки перенесены в библиотеку datamodel-code-generator — известную библиотеку для генерации модели данных по Open API схеме.
Запустим datamodel-code-generator для нашей схемы
$ poetry add "datamodel-code-generator[graphql]"
$ datamodel-codegen --input tests/data/schema.docs.graphql --input-file-type graphql --output github_graphql_client/model.py
Классы из примера выше в формате datamodel-code-generator будут выглядеть следующим образом
# `github_graphql_client/model.py` file
...
class Issue(
Assignable,
Closable,
Comment,
Deletable,
Labelable,
Lockable,
Node,
ProjectV2Owner,
Reactable,
RepositoryNode,
Subscribable,
SubscribableThread,
UniformResourceLocatable,
Updatable,
UpdatableComment,
):
"""
An Issue is a place to discuss ideas, enhancements, tasks, and bugs for a project.
"""
...
...
class IssueEdge(BaseModel):
"""
An edge in a connection.
"""
cursor: String
node: Optional[Issue] = None
typename__: Optional[Literal['IssueEdge']] = Field(
'IssueEdge', alias='__typename'
)
...
class IssueConnection(BaseModel):
"""
The connection type for Issue.
"""
edges: Optional[List[Optional[IssueEdge]]] = Field(default_factory=list)
nodes: Optional[List[Optional[Issue]]] = Field(default_factory=list)
pageInfo: PageInfo
totalCount: Int
typename__: Optional[Literal['IssueConnection']] = Field(
'IssueConnection', alias='__typename'
)
...
class Repository(
Node,
PackageOwner,
ProjectOwner,
ProjectV2Recent,
RepositoryInfo,
Starrable,
Subscribable,
UniformResourceLocatable,
):
"""
A repository contains the content for a project.
"""
...
...
Пакет datamodel-code-generator имеет гибкую настройку результирующей модели данных. Например, можно указать
версию pydantic;
версию python;
альтернативный класс (например, dataclasses.dataclass);
имена полей для некоторых типов;
свои кастомные шаблоны (написанные так же на jinja2);
Подробнее со всеми возможностями datamodel-code-generator можно ознакомиться в документации.
Заключение
Мы написали клиент, который уже может приносить пользу при решении некоторых практических задач. Но все равно остался ряд тем, которые вышли за рамками данной статьи. Например,
кэширование на стороне клиента;
сервер отклоняет запрос из-за большого количества токенов в нем;
поддержка кастомных скаляров;
работа клиента через websockets;
что делать, если нужно одновременно совершить несколько сотен/тысяч однотипных мутаций?
Индустрия так же не стоит на месте. Наиболее актуальные инструменты для GraphQL клиентов можно посмотреть тут: https://graphql.org/code/#python-client. Например, стоит обратить внимание на молодой инструмент ariadne-codegen для генерации типизированного GraphQL клиента по вашим схемам и запросам.
Буду рад услышать ваш опыт работы с GraphQL клиентами на python в комментариях. Возможно, вы решали проблему, которая не упоминается в данной статье и можете о ней рассказать.
Спасибо за внимание!
Virviil
Исходя из моего опыта: если вы делаете межсервисное взаимодействие через graphql - вы делаете что-то не то. Более того: если ОБА сервиса - ваши, а не third party.
denisart Автор
Спасибо за комментарий.
У нас довольно сложная и объемная модель данных и большое количество клиентов этих данных. Удобно использовать преимущество GraphQL -- каждому получать именно те данные, которые требуются.
Так же, у нас некий аналог DDD подхода, а GraphQL служит схемой.
В нашей команде мы пришли к выводу, что не прогодали с GraphQL, но как верно вы заметили -- это не всегда удачное решение, которое к тому же имеет ряд проблем.
Вот статья про одну из них внутри нашего проекта: https://habr.com/ru/articles/707648/