Эта статья родилась из опыта использования GraphQL в проекте одного из крупнейших аэропортов РФ. Проект посвящен разработке системы по автоматизации обслуживания рейсов и управлению ресурсами аэропорта в реальном времени (MRMS система).

Проект реализован на базе микросервисной архитектуры, где модель данных аэропорта представлена в виде GraphQL API, а сервер, предоставляющий API, написан на java. Клиентами этого API являются не только web/mobile, но и сервисы на java, golang и python.

Статья написана как пошаговое руководство по созданию своего GraphQL клиента на python с нуля, где автор демонстрирует проблемы, возникающие на этом пути. Использовать реальную GraphQL схему аэропорта не представляется возможным, поэтому для наглядности будем использовать открытую схему github GraphQL API.

Содержание

  1. Что нас ждет

  2. Подготовка

  3. Первый запрос

  4. Синхронный клиент

  5. Асинхронный клиент

  6. Создаем клиент

  7. Работа с запросами

  8. Генерация модели данных

  9. Заключение

Что нас ждет

Вот некоторые из задач, которые мы рассмотрим

  • как совершить GraphQL запрос и получить ответ от сервера?

  • асинхронные запросы к серверу;

  • как управлять своей кодовой базой запросов?

  • хранение ответов от сервера в виде типизированных классов;

Упоминание новых инструментов будет всегда сопровождаться ссылкой на оф. документацию. Но для комфортного чтения статьи желательно знать основы GraphQL. Так же будет плюсом иметь опыт работы с библиотеками requests, asyncio и aiohttp.

Исходный код из статьи можно найти тут. Код написан на python 3.11 с использованием poetry, типизирован и отформатирован с помощью black.

Подготовка

Для начала подготовимся к работе с github GraphQL API. Во-первых, настроим наше python окружение. Будем использовать poetry и следующую начальную структуру файлов

├── github_graphql_client/  <- Тут будет код нашего клиента
│   └── __init__.py
├── tests/                  <- Тут будут тесты
│   └── __init__.py
├── scripts/                <- Тут будут различные скрипты для запуска
├── pyproject.toml          <- Файл с настройками проекта
├── README.md
├── .env                    <- Тут будут всякие sensitive переменные
└── .gitignore

Конфиг pyproject.toml следующий

# `pyproject.toml` file
[tool.poetry]
name = "github_graphql_client"
version = "0.1.0"
description = "Github GraphQL client for Habr."
authors = ["FirstName SecondName <email>"]
readme = "README.md"
packages = [{include = "github_graphql_client"}]

[tool.poetry.dependencies]
python = "^3.11"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

Во-вторых, Github GraphQL endpoint находится по адресу https://api.github.com/graphql. Чтобы воспользоваться API — нужно получить токен. Подробности про выпуск токена описаны тут: Github - Forming calls with GraphQL. В файле .env перечислим следующие параметры

GITHUB_TOKEN=<YOUR_TOKEN>
GITHUB_GRAPHQL_ENDPOINT=https://api.github.com/graphql

Для работы с файлом .env будем использовать пакет python-dotenv. Добавим его в качестве нашей первой зависимости

$ poetry add  python-dotenv
Creating virtualenv blah-blah-py3.11 in /home/blah-blah/pypoetry/virtualenvs
Using version ^1.0.0 for python-dotenv

Updating dependencies
Resolving dependencies... (0.1s)

Package operations: 1 install, 0 updates, 0 removals

  • Installing python-dotenv (1.0.0)

Writing lock file

После выполнения команды poetry add появится файл poetry.lock, а так же наша зависимость будет добавлена в pyproject.toml

# `pyproject.toml` file
# ...

[tool.poetry.dependencies]
python = "^3.11"
python-dotenv = "^1.0.0"

# ...

И, наконец, запустим poetry install для установки всех зависимостей.

Первый запрос

Основное, что должен уметь наш клиент — подключаться к серверу. Обычно GraphQL работает по протоколу HTTP/HTTPS через единственный POST, который ожидает на входе json с полями query: str и variables: dict[str, Any]. Так это работает и в github GraphQL API. Схему github GraphQL API можно посмотреть тут. Для начала спроектируем класс, который будет отвечать непосредственно за соединение с сервером и получение данных. Добавим несколько новых файлов

$ mkdir github_graphql_client/transport
$ touch github_graphql_client/transport/__init__.py github_graphql_client/transport/base.py

В файле github_graphql_client/transport/base.py определим BaseTransport

# `github_graphql_client/transport/base.py` file
from typing import Any

class BaseTransport:
    """An abstract transport."""

    def execute(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        """Execute GraphQL query."""
        raise NotImplementedError

    def connect(self) -> None:
        """Establish a session with the transport."""
        raise NotImplementedError

    def close(self) -> None:
        """Close a session."""
        raise NotImplementedError

Любой класс типа Transport должен наследоваться от BaseTransport. Для этого необходимо реализовать три метода

  • connect — метод для открытия соединения с сервером;

  • close — метод для закрытия соединения;

  • execute — метод для выполнения запроса query с переменными variables;

Так, как GraphQL запрос есть обычный POST запрос — клиент может быть реализован с помощью пакета requests. Для начала добавим зависимость

$ poetry add requests

В новый файл github_graphql_client/transport/requests.py добавим следующий код

# `github_graphql_client/transport/requests.py` file
from typing import Any, Optional

import requests as r

from github_graphql_client.transport.base import BaseTransport

class RequestsTransport(BaseTransport):
    """The transport based on requests library."""

    session: Optional[r.Session]

    def __init__(self, endpoint: str, token: str, **kwargs: Any) -> None:
        self.endpoint = endpoint
        self.token = token
        self.auth_header = {"Authorization": f"Bearer {self.token}"}

        self.session = None

    def connect(self) -> None:
        """Start a `requests.Session` connection."""
        if self.session is None:
            self.session = r.Session()
        else:
            raise Exception("Session already started")

    def close(self) -> None:
        """Closing `requests.Session` connection."""
        if self.session is not None:
            self.session.close()
            self.session = None

    def execute(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        """Execute GraphQL query."""
        if self.session is None:
            raise Exception(f"RequestsTransport session not connected")

        post_args = {
            "headers": self.auth_header,
            "json": {"query": query, "variables": variables},
        }
        post_args["headers"]["Content-Type"] = "application/json"

        response = self.session.request("POST", self.endpoint, **post_args)

        result = response.json()
        return result.get("data")

В данной реализации

  • метод connect создает объект requests.Session если он не был создан ранее;

  • метод close закрывает соединение для объекта requests.Session;

  • метод execute отправляет с помощью объекта requests.Session обычный POST запрос;

Давайте проверим, что с помощью этого класса мы уже можем получить данные. Отправим query, который должен вернуть несколько первых завершенных issues из выбранного github репозитория. Для примера, рассмотрим новый проект автора pydantic - FastUI. Хранить запросы будем в отдельной папке

$ mkdir github_graphql_client/queries
$ touch github_graphql_client/queries/__init__.py github_graphql_client/queries/repository.py
# `github_graphql_client/queries/repository.py` file
repository_issues_query = """
query {
  repository(owner:"pydantic", name:"FastUI") {
    issues(last:2, states:CLOSED) {
      edges {
        node {
          title
          url
        }
      }
    }
  }
}
"""

Для удобства добавим скрипт scripts/run.py

# `scripts/run.py` file
import os
import time
from typing import Any

from dotenv import load_dotenv

from github_graphql_client.client.requests_client import RequestsClient
from github_graphql_client.queries.repository import repository_issues_query

load_dotenv()  # take environment variables from .env

GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
GITHUB_GRAPHQL_ENDPOINT = os.environ.get("GITHUB_GRAPHQL_ENDPOINT")

def main():
    transport = RequestsTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT, token=GITHUB_TOKEN
    )
    transport.connect()

    data = transport.execute(
        query=repository_issues_query, variables={},
    )
    print(data)
    
    transport.close()

if __name__ == "__main__":
    main()

После запуска скрипта мы получим примерно следующее

$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}

Проект активно живет, поэтому сейчас issues будут другими.

Таймаут

Хорошо, если вы понимаете, сколько должен выполняться ваш запрос. Процитируем одну известную статью

Не ставить таймаут на задачу — это зло. Это значит, что вы не понимаете, что происходит в задаче, как должна работать бизнес-логика.

Добавим таймаут в RequestsTransport

# `github_graphql_client/transport/requests.py` file
...

class RequestsTransport(BaseTransport):
    """The transport based on requests library."""

    DEFAULT_TIMEOUT: int = 1
    session: Optional[r.Session]

    def __init__(self, endpoint: str, token: str, **kwargs: Any) -> None:
        ...

        self.timeout = kwargs.get("timeout", RequestsTransport.DEFAULT_TIMEOUT)

    ...

    def execute(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        ...

        post_args = {
            "headers": self.auth_header,
            "json": {"query": query, "variables": variables},
            "timeout": self.timeout,
        }
        ...

Проверим наш код указав очень маленький (для github GraphQL API) таймаут

# `scripts/run.py` file
...

def main():
    transport = RequestsTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT,
        token=GITHUB_TOKEN,
        timeout=0.0001,
    )
    ...

...

Запустив scripts/run.py, мы получим следующее

$ python3 scripts/run.py
...
requests.exceptions.ConnectTimeout: HTTPSConnectionPool(host='api.github.com', port=443): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f5510137d50>, 'Connection to api.github.com timed out. (connect timeout=0.0001)'))

По аналогии вы можете добавить свои настройки для requests, а мы пойдем дальше.

Синхронный клиент

Создадим клиент, который научим работать с BaseTransport. После выполнения команд

$ mkdir github_graphql_client/client
$ touch github_graphql_client/client/__init__.py github_graphql_client/client/sync_client.py

в файл github_graphql_client/client/sync_client.py добавим следующий код

# `github_graphql_client/client/sync_client.py` file
from typing import Any

from github_graphql_client.transport.base import BaseTransport

class SyncGraphQLClient:
    """Sync GraphQL client based on `BaseTransport` transport."""

    transport: BaseTransport

    def __init__(self, transport: BaseTransport) -> None:
        self.transport = transport

    def __enter__(self):
        self.connect_sync()
        return self

    def __exit__(self, *args):
        self.close_sync()

    def connect_sync(self) -> None:
        """Connect to `self.transport`."""
        self.transport.connect()

    def close_sync(self) -> None:
        """Close `self.transport` connection."""
        self.transport.close()

    def execute_sync(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        return self.transport.execute(query, variables, **kwargs)

Класс SyncGraphQLClient умеет запускать сессию для произвольного класса BaseTransport и получать результаты запросов опуская детали реализации самого запроса.

  • метод connect_sync создает соединение через self.transport;

  • метод close_sync закрывает соединение через self.transport;

  • метод execute_sync получает данные с сервера через self.transport;

  • методы __enter__ и __exit__ предназначены для того, чтобы запускать клиент в контекстном менеджере и не забывать закрыть соединение;

Проверим, что это работает

# `scripts/run.py` file
...

from github_graphql_client.client.sync_client import SyncGraphQLClient

...

def main():
    transport = RequestsTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT,
        token=GITHUB_TOKEN,
    )

    with SyncGraphQLClient(transport=transport) as client:
        data = client.execute_sync(
            query=repository_issues_query, variables={},
        )
        print(data)

...

Запустив scripts/run.py, получим тот же результат

$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}

Слово sync в названии SyncGraphQLClient не случайно. Немного позже у нас появится AsyncGraphQLClient, и мы объединим их в один GraphQLClient.

Несколько запросов

Давайте внутри одной сессии выполним несколько GraphQL запросов. Для этого изменим наш запрос так, чтобы иметь возможность указать название желаемого репозитория

# `github_graphql_client/queries/repository.py` file
def get_repository_issues_query(owner: str, name: str) -> str:
    return """query {
  repository(owner:"%s", name:"%s") {
    issues(last:2, states:CLOSED) {
      edges {
        node {
          title
          url
        }
      }
    }
  }
}
""" % (
        owner,
        name,
    )

Наш скрипт scripts/run.py будет выглядеть следующим образом

# `scripts/run.py` file
...

def main():
    transport = RequestsTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT,
        token=GITHUB_TOKEN,
    )

    with SyncGraphQLClient(transport=transport) as client:
        data = client.execute_sync(
            query=get_repository_issues_query("pydantic", "FastUI"),
            variables={},
        )
        print(data)
        
        data = client.execute_sync(
            query=get_repository_issues_query("pydantic", "pydantic"),
            variables={},
        )
        print(data)
        
        data = client.execute_sync(
            query=get_repository_issues_query("pydantic", "pydantic-core"),
            variables={},
        )
        print(data)

...

После запуска получим примерно следующее

$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
{'repository': {'issues': {'edges': [{'node': {'title': "__init__.cpython-311-darwin.so  is an incompatible architecture (have 'x86_64', need 'arm64') in M1 mac mini", 'url': 'https://github.com/pydantic/pydantic/issues/8396'}}, {'node': {'title': 'Override class used in annotations', 'url': 'https://github.com/pydantic/pydantic/issues/8408'}}]}}}
{'repository': {'issues': {'edges': [{'node': {'title': '2.14.4 release upload failed', 'url': 'https://github.com/pydantic/pydantic-core/issues/1082'}}, {'node': {'title': "(????) `ValidationError` can't be instantiated", 'url': 'https://github.com/pydantic/pydantic-core/issues/1115'}}]}}}

Асинхронный клиент

В предыдущем примере мы ожидали ответ на первый запрос и только после этого отправляли второй запрос. Зачем нам тратить столько времени? Давайте реализуем асинхронное соединение.

В файле github_graphql_client/transport/base.py добавим класс BaseAsyncTransport по аналогии с BaseTransport

# `github_graphql_client/transport/base.py` file
...

class BaseAsyncTransport:
    """An abstract async transport."""

    async def execute(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        """Execute GraphQL query."""
        raise NotImplementedError

    async def connect(self) -> None:
        """Establish a session with the transport."""
        raise NotImplementedError

    async def close(self) -> None:
        """Close a session."""
        raise NotImplementedError

Для примера реализуем асинхронный backend на aiohttp. Добавим этот пакет в проект

$ poetry add aiohttp

В новом файле github_graphql_client/transport/aiohttp.py создадим класс AIOHTTPTransport

# `github_graphql_client/transport/aiohttp.py` file
from typing import Any, Optional

import aiohttp

from github_graphql_client.transport.base import BaseAsyncTransport

class AIOHTTPTransport(BaseAsyncTransport):
    """The transport based on aiohttp library."""

    DEFAULT_TIMEOUT = 1
    session: Optional[aiohttp.ClientSession]

    def __init__(self, endpoint: str, token: str, **kwargs: Any) -> None:
        self.endpoint = endpoint
        self.token = token
        self.auth_header = {"Authorization": f"Bearer {self.token}"}
        self.timeout = kwargs.get("timeout", AIOHTTPTransport.DEFAULT_TIMEOUT)

        self.session = None

    async def connect(self) -> None:
        """Coroutine which will create an aiohttp ClientSession() as self.session."""
        if self.session is None:
            self.session = aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=self.timeout),
                headers=self.auth_header,
            )
        else:
            raise Exception(f"AIOHTTPTransport is already connected")

    async def close(self) -> None:
        """Coroutine which will close the aiohttp session."""
        if self.session is not None:
            await self.session.close()
            self.session = None

    async def execute(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        """Execute GraphQL query with aiohttp."""
        if self.session is None:
            raise Exception(f"AIOHTTPTransport session not connected")

        async with self.session.post(
            self.endpoint,
            json={"query": query, "variables": variables},
        ) as response:
            data = await response.json()

        return data.get("data")

Тут все очень похоже на RequestsTransport

  • connect — это корутина, которая создает сессию с помощью aiohttp.ClientSession;

  • close — корутина, которая завершает aiohttp.ClientSession;

  • execute — корутина, которая с помощью открытой aiohttp.ClientSession асинхронно получает данные с сервера;

  • для правильной установки таймаута используем aiohttp.ClientTimeout;

Проверим асинхронную работу AIOHTTPTransport с помощью asyncio

# `scripts/run.py` file
import asyncio
...

async def run_aiohttp_transport():
    transport = AIOHTTPTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT,
        token=GITHUB_TOKEN,
    )
    await transport.connect()

    data = await transport.execute(
        query=get_repository_issues_query("pydantic", "FastUI"),
        variables={},
    )
    print(data)

    await transport.close()

def amain():
    asyncio.run(run_aiohttp_transport())

if __name__ == "__main__":
    amain()

Результат нам уже знаком

$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}

Создадим теперь асинхронный клиент для работы с BaseAsyncTransport

# `github_graphql_client/client/async_client.py` file`
from typing import Any

from github_graphql_client.transport.base import BaseAsyncTransport

class AsyncGraphQLClient:
    """Async GraphQL client based on `BaseAsyncTransport` transport."""

    transport: BaseAsyncTransport

    def __init__(self, transport: BaseAsyncTransport) -> None:
        self.transport = transport

    async def __aenter__(self):
        await self.connect_async()
        return self

    async def __aexit__(self, *args):
        await self.close_async()

    async def connect_async(self) -> None:
        """Connect to `self.transport`."""
        await self.transport.connect()

    async def close_async(self) -> None:
        """Close `self.transport` connection."""
        await self.transport.close()

    async def execute_async(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        return await self.transport.execute(query, variables, **kwargs)

Тут все почти так же, как в SyncGraphQLClient, но асинхронно. Проверим

# `scripts/run.py` file
...

async def amain():
    transport = AIOHTTPTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT,
        token=GITHUB_TOKEN,
    )

    async with AsyncGraphQLClient(transport=transport) as client:
        data = await client.execute_async(
            query=get_repository_issues_query("pydantic", "FastUI", last=2),
            variables={},
        )
        print(data)

def main():
    asyncio.run(amain())

if __name__ == "__main__":
    main()
$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}

Теперь перепишем последний пример предыдущего раздела с помощью нового асинхронного клиента

# `scripts/run.py` file
...

def check_execute(fn):
    async def wrapper(*args, **kwargs):
        tic = time.perf_counter()
        result = await fn(*args, **kwargs)
        toc = time.perf_counter()
        print(f"Duration for {fn.__name__} is {toc - tic:0.4f} seconds")
        return result

    return wrapper

@check_execute
async def execute(client, query: str, variables: dict[str, Any]) -> None:
    data = await client.execute_async(query, variables)
    print(data)

@check_execute
async def amain():
    transport = AIOHTTPTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT,
        token=GITHUB_TOKEN,
    )

    async with AsyncGraphQLClient(transport=transport) as client:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(
                execute(
                    client,
                    query=get_repository_issues_query(
                        "pydantic", "FastUI",
                    ),
                    variables={},
                )
            )

            tg.create_task(
                execute(
                    client,
                    query=get_repository_issues_query(
                        "pydantic", "pydantic",
                    ),
                    variables={},
                )
            )
            
            tg.create_task(
                execute(
                    client,
                    query=get_repository_issues_query(
                        "pydantic", "pydantic-core",
                    ),
                    variables={},
                )
            )

def main():
    asyncio.run(amain())

...

В контекстном менеджере клиента AsyncGraphQLClient мы асинхронно запускаем несколько задач execute. Это реализовано с помощью asyncio.TaskGroup. Декоратор check_execute выводит для нас время выполнения

$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
Duration for execute is 0.5999 seconds
{'repository': {'issues': {'edges': [{'node': {'title': '2.14.4 release upload failed', 'url': 'https://github.com/pydantic/pydantic-core/issues/1082'}}, {'node': {'title': "(????) `ValidationError` can't be instantiated", 'url': 'https://github.com/pydantic/pydantic-core/issues/1115'}}]}}}
Duration for execute is 0.5987 seconds
{'repository': {'issues': {'edges': [{'node': {'title': "__init__.cpython-311-darwin.so  is an incompatible architecture (have 'x86_64', need 'arm64') in M1 mac mini", 'url': 'https://github.com/pydantic/pydantic/issues/8396'}}, {'node': {'title': 'Override class used in annotations', 'url': 'https://github.com/pydantic/pydantic/issues/8408'}}]}}}
Duration for execute is 0.5991 seconds
Duration for amain is 0.6005 seconds

Теперь мы получили, что время выполнения функции amain почти равно времени выполнения самого длительного запроса.

Создаем клиент

Пора объединить SyncGraphQLClient и AsyncGraphQLClient в один GraphQLClient, чтобы использовать один интерфейс. Создадим файл github_graphql_client/client/client.py

# `github_graphql_client/client/client.py` file
import asyncio
from typing import Any, Union

from github_graphql_client.transport.base import (
    BaseAsyncTransport,
    BaseTransport,
)

from .async_client import AsyncGraphQLClient
from .sync_client import SyncGraphQLClient

class GraphQLClient(SyncGraphQLClient, AsyncGraphQLClient):
    """GraphQL client."""

    transport: Union[BaseTransport, BaseAsyncTransport]

    def __init__(
        self, transport: Union[BaseTransport, BaseAsyncTransport]
    ) -> None:
        super().__init__(transport)

    async def _execute_async(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        async with self as client:
            data = await client.execute_async(
                query,
                variables,
                **kwargs,
            )
        return data

    async def _execute_batch_async(
        self,
        queries: list[str],
        variables: list[dict[str, Any]],
        **kwargs: Any,
    ) -> list[dict[str, Any]]:
        tasks = []

        async with self as client:
            for i in range(len(queries)):
                query = queries[i]
                vars = variables[i]

                tasks.append(client.execute_async(query, vars, **kwargs))

            result_data = await asyncio.gather(*tasks)

        return list(result_data)

    def execute(
        self, query: str, variables: dict[str, Any], **kwargs: Any
    ) -> dict[str, Any]:
        """Execute GraphQL query."""

        if isinstance(self.transport, BaseAsyncTransport):
            return asyncio.run(self._execute_async(query, variables, **kwargs))
        else:
            with self as client:
                return client.execute_sync(query, variables, **kwargs)

    def execute_batch(
        self,
        queries: list[str],
        variables: list[dict[str, Any]],
        **kwargs: Any,
    ) -> list[dict[str, Any]]:
        """Execute a batch of GraphQL queries."""

        if isinstance(self.transport, BaseAsyncTransport):
            return asyncio.run(
                self._execute_batch_async(queries, variables, **kwargs)
            )
        else:
            results = []
            with self as client:
                for i in range(len(queries)):
                    query = queries[i]
                    vars = variables[i]

                    res = client.execute_sync(query, vars, **kwargs)

                    results.append(res)

            return results

Два основных метода для использования: execute и execute_batch. Внутри эти методы определяют какой тип у self.transport, а после вызывают код из наших прошлых примеров. Попробовать все вариант можно с помощью следующего кода

# `scripts/run.py` file
...

def main():
    transport_r = RequestsTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT,
        token=GITHUB_TOKEN,
    )
    transport_a = AIOHTTPTransport(
        endpoint=GITHUB_GRAPHQL_ENDPOINT,
        token=GITHUB_TOKEN,
    )

    client_r = GraphQLClient(transport=transport_r)
    client_a = GraphQLClient(transport=transport_a)

    q_1 = get_repository_issues_query(
        "pydantic",
        "FastUI",
    )
    q_2 = get_repository_issues_query(
        "pydantic",
        "pydantic",
    )
    q_3 = get_repository_issues_query(
        "pydantic",
        "pydantic-core",
    )

    data_r1 = client_r.execute(q_1, {})
    data_a1 = client_a.execute(q_1, {})

    data_r3 = client_r.execute_batch([q_1, q_2, q_3], [{}, {}, {}])
    data_a3 = client_a.execute_batch([q_1, q_2, q_3], [{}, {}, {}])

...

Поздравляю, мы только что написали свою упрощенную версию библиотеки gql. gql основана на той же идее, но имеет много дополнительной обвязки

  • различные реализации backend: requests, async, websockets;

  • различные проверки, которые мы осознанно опускали;

  • валидация запросов;

  • проверка ошибки запроса от сервера;

Пробуем gql

Мы не будем приводить всю документацию gql, но рассмотрим один пример. Выполним наш запрос с помощью gql. Установим пакет следующим образом

$ poetry add "gql[all]"

И запустим пример из документации для нашего запроса

# `scripts/run.py` file
...

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

...

def check_execute(fn):
    def wrapper(*args, **kwargs):
        tic = time.perf_counter()
        result = fn(*args, **kwargs)
        toc = time.perf_counter()
        print(f"Duration for {fn.__name__} is {toc - tic:0.4f} seconds")
        return result

    return wrapper

@check_execute
def main():
    transport = AIOHTTPTransport(
        url=GITHUB_GRAPHQL_ENDPOINT,
        headers={"Authorization": f"Bearer {GITHUB_TOKEN}"},
    )
    client = Client(transport=transport, fetch_schema_from_transport=True)

    # Provide a GraphQL query
    query = gql(get_repository_issues_query("pydantic", "FastUI", last=2))

    result = client.execute(query)
    print(result)

...

Результат будет примерно следующим

$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': 'More PageEvent Triggers', 'url': 'https://github.com/pydantic/FastUI/issues/104'}}, {'node': {'title': 'TypeError: Interval() takes no arguments', 'url': 'https://github.com/pydantic/FastUI/issues/105'}}]}}}
Duration for main is 8.5059 seconds

Ого, 8 секунд! Но запрос тот же самый. Значит время уходит не на ожидание ответа от сервера, а на накладные расходы самой библиотеки. Эти накладные расходы связаны с параметром fetch_schema_from_transport.

Пакет gql базируется на библиотеке graphql-core, которая является python реализацией проекта GraphQL.js. И если параметр fetch_schema_from_transport имеет значение True, то во время первого запроса клиент фетчит схему и билдит ее в формат graphql_core.GraphQLSchema.

На момент написания статьи схема github GraphQL API содержит 61602 строки. Т.е. все 8 секунд уходят именно на анализ схемы, которая, конечно, довольно большая. Если fetch_schema_from_transport=False, то время выполнения запроса будет совпадать с нашим клиентом.

Работа с запросами

Только что мы увидели проблему фетчинга большой схемы. Фетчинг происходит для того, чтобы произвести валидацию запросов на клиенте. Если зафиксировать схему, с которой мы работаем, то валидацию можно перенести на уровень тестов.

Валидация запросов

Установим pytest

$ poetry add pytest --group test

Сохраним GraphQL схему локально в файле tests/data/schema.docs.graphql. Тест для валидации нашего запроса repository будет следующим

# `tests/test_queries_validation.py` file
from pathlib import Path

from graphql import Source, parse, build_schema, validate, GraphQLSchema

from github_graphql_client.queries.repository import get_repository_issues_query

SCHEMA_FILENAME = Path(__file__).parent / Path("data/schema.docs.graphql")

def get_schema() -> GraphQLSchema:
    with SCHEMA_FILENAME.open("r", encoding="utf8") as f:
        schema_str = f.read()

    return build_schema(schema_str)

schema = get_schema()

def test_repository_issues_query():
    query = get_repository_issues_query("pydantic", "FastUI")
    document = parse(Source(query))

    validation_errors = validate(schema, document)
    if validation_errors:
        raise validation_errors[0]

Что происходит

  • с помощью библиотеки graphql-core создаем объект schema: graphql.GraphQLSchema используя схему в формате строки;

  • парсим запрос с помощью метода graphql.parse;

  • используем метод graphql.validate(schema, document) для валидации запроса;

Если использовать gql, то gql.Client предоставляет готовый метод валидации gql.Client.validate.

Для примера, допустим ошибку в названии поля issues внутри нашего запроса и запустим тест

# Файл `github_graphql_client/queries/repository.py`
repository_issues_query = """
...
  repository(owner:"pydantic", name:"FastUI") {
    issuess(last:2, states:CLOSED) {
...
"""

Тест скажет нам примерно следующее

document = DocumentNode at 0:183

    def validate(self, document: DocumentNode):
        """:meta private:"""
        assert (
            self.schema
        ), "Cannot validate the document locally, you need to pass a schema."
    
        validation_errors = validate(self.schema, document)
        if validation_errors:
>           raise validation_errors[0]
E           graphql.error.graphql_error.GraphQLError: Cannot query field 'issuess' on type 'Repository'. Did you mean 'issues' or 'issue'?
E           
E           GraphQL request:4:5
E           3 |   repository(owner:"pydantic", name:"FastUI") {
E           4 |     issuess(last:2, states:CLOSED) {
E             |     ^
E           5 |       edges {

GraphQL variables

Мы еще ни разу не использовали GraphQL variables. Рассмотрим следующий запрос marketplaceCategories к github GraphQL API

"""
Get alphabetically sorted list of Marketplace categories
"""
marketplaceCategories(
  """
  Exclude categories with no listings.
  """
  excludeEmpty: Boolean

  """
  Returns top level categories only, excluding any subcategories.
  """
  excludeSubcategories: Boolean

  """
  Return only the specified categories.
  """
  includeCategories: [String!]
): [MarketplaceCategory!]!

Добавим его в github_graphql_client/queries/

# `github_graphql_client/queries/marketplaceCategories.py` file
def get_marketplace_categories(
    exclude_empty: bool,
    exclude_subcategories: bool,
    include_categories: list[str],
) -> str:
    return """query {
  marketplaceCategories(excludeEmpty: %s, excludeSubcategories: %s, includeCategories: %s) {
    id
    description
  } 
}""" % (
        str(exclude_empty).lower(),
        str(exclude_subcategories).lower(),
        str(include_categories).replace("'", '"'),
    )

Чтобы создать корректную строку с GraphQL запросом с помощью форматирования строк, нам необходимо

  • преобразовать булево значение True/False в строку true/false;

  • преобразовать массив строк ['1', '2', '3'] в строку '["1", "2", "3"]';

Бывают и более сложные кейсы, когда нам нужен массив enum значений. Т.е. массив ["A", "B"] необходимо преобразовать в строку '[A, B]'. Чтобы этого избежать эти преобразования и нужны GraphQL variables

# `github_graphql_client/queries/marketplaceCategories.py` file
from typing import Any

def get_marketplace_categories(
    exclude_empty: bool,
    exclude_subcategories: bool,
    include_categories: list[str],
) -> tuple[str, dict[str, Any]]:
    variables = {
        "excludeEmpty": exclude_empty,
        "excludeSubcategories": exclude_subcategories,
        "includeCategories": include_categories,
    }

    query = """query(
  $excludeEmpty: Boolean
  $excludeSubcategories: Boolean
  $includeCategories: [String!]
) {
  marketplaceCategories(
    excludeEmpty: $excludeEmpty,
    excludeSubcategories: $excludeSubcategories,
    includeCategories: $includeCategories
  ) {
    id
    description
  } 
}"""

    return query, variables

Теперь наша функция get_marketplace_categories возвращает строку с запросом и словарь variables. Не забудем добавить тест

# `tests/test_queries_validation.py` file
...

from github_graphql_client.queries.marketplaceCategories import \
    get_marketplace_categories

...

def test_get_marketplace_categories():
    query, _ = get_marketplace_categories(True, False, ["1", "2", "3"])
    document = parse(Source(query))

    validation_errors = validate(schema, document)
    if validation_errors:
        raise validation_errors[0]

Пакет graphql-query

Какие еще могут возникнуть проблему при управлении GraphQL запросами

  • шарить повторяющиеся куски запроса между разными запросами;

  • использовать фрагменты и инлайн-фрагменты;

  • использовать несколько GraphQL запросов внутри одного;

  • использовать алиасы для имени запроса;

Чем больше становится объем запросов в нашем проекте, тем сложнее всем этим управлять. По этим причинам была разработана библиотека graphql-query. Она предоставляет возможность генерации валидных строк с GraphQL запросами используя python классы. Генерация основана на библиотеке jinja2.

Документацию graphql-query можно посмотреть тут: https://denisart.github.io/graphql-query/. Так же есть статья на habr с пересказом документации на русском языке.

Для демонстрации перепишем наши текущие запросы используя graphql-query

$ poetry add graphql-query

Структура того, как устроен запрос

  • Создаем объект graphql_query.Operation, который есть наш запрос;

  • Указываем тип для graphql_query.Operation: query, mutation, subscription;

  • Указываем имя graphql_query.Operation. Это шаг не обязательный, но является хорошей практикой. Сервер сможет по названию отслеживать метрики этого запроса именно от вашего сервиса;

  • Добавляем в graphql_query.Operation все необходимые variables в формате graphql_query.Variable;

  • Сами запросы создаем через graphql_query.Query;

  • У graphql_query.Query указываем имя, список аргументов как массив graphql_query.Argument и список желаемых полей.

Поля, переменные, аргументы задаются специальными классами. А значит их легко шарить между разными запросами. Если у вас в проекте всего один запрос, то такой подход может усложнить разработку, потому что кода становится больше.

Пример запроса marketplaceCategories в формате graphql-query

# `github_graphql_client/queries/marketplaceCategories.py` file
from typing import Any

from graphql_query import Operation, Argument, Variable, Field, Query

var_exclude_empty = Variable(name="excludeEmpty", type="Boolean")
var_exclude_subcategories = Variable(
    name="excludeSubcategories", type="Boolean"
)
var_include_categories = Variable(name="includeCategories", type="[String!]")

def get_marketplace_categories(
    exclude_empty: bool,
    exclude_subcategories: bool,
    include_categories: list[str],
) -> tuple[str, dict[str, Any]]:
    marketplace_categories_query = Query(
        name="marketplaceCategories",
        arguments=[
            Argument(name="excludeEmpty", value=var_exclude_empty),
            Argument(
                name="excludeSubcategories",
                value=var_exclude_subcategories,
            ),
            Argument(name="includeCategories", value=var_include_categories),
        ],
        fields=["id", "description"],
    )

    operation = Operation(
        type="query",
        name="getMarketplaceCategories",
        variables=[
            var_exclude_empty,
            var_exclude_subcategories,
            var_include_categories,
        ],
        queries=[marketplace_categories_query],
    )

    variables = {
        var_exclude_empty.name: exclude_empty,
        var_exclude_subcategories.name: exclude_subcategories,
        var_include_categories.name: include_categories,
    }

    return operation.render(), variables

Вызов operation.render() вернет нам следующий результат

query getMarketplaceCategories(
  $excludeEmpty: Boolean
  $excludeSubcategories: Boolean
  $includeCategories: [String!]
) {
  marketplaceCategories(
    excludeEmpty: $excludeEmpty
    excludeSubcategories: $excludeSubcategories
    includeCategories: $includeCategories
  ) {
    id
    description
  }
}

Не забываем убедиться, что тест test_get_marketplace_categories все так же завершается успешно. Остальные примеры использования (включая graphql_query.Fragment и graphql_query.InlineFragment) можно посмотреть в документации.

Генерация модели данных

Давайте запросим 20 последних открытых issues библиотеки pydantic. Немного обновим наш запрос repository

# `github_graphql_client/queries/repository.py` file
...
var_issue_state = Variable(name="IssueState", type="[IssueState!]")

...

def get_repository_issues_query(
    owner: str,
    name: str,
    last: int,
    state: str,
) -> tuple[str, dict[str, Any]]:
    ...
    
    operation = Operation(
        ...
        variables=[var_owner, var_name, var_last, var_issue_state],
        ...
    )

    return operation.render(), {
        var_owner.name: owner,
        var_name.name: name,
        var_last.name: last,
        var_issue_state.name: state,
    }

Результат запроса get_repository_issues_query("pydantic", "pydantic", 20, "OPEN") будет примерно следующим

$ python3 scripts/run.py
{'repository': {'issues': {'edges': [{'node': {'title': "Can't use config keyword argument with TypeAdapter.__init__ on stdlib dataclass", 'url': 'https://github.com/pydantic/pydantic/issues/8326'}}, {'node': {'title': 'Constructor for model with `Json[list[int]]` field should accept `list[int]`, like mypy already expects', 'url': 'https://github.com/pydantic/pydantic/issues/8336'}}, {'node': {'title': 'JSON serialization issue with ipaddress classes as alternative', 'url': 'https://github.com/pydantic/pydantic/issues/8343'}}, {'node': {'title': 'Indeterminate: LookupError when generating custom schema output', 'url': 'https://github.com/pydantic/pydantic/issues/8359'}}, {'node': {'title': 'Implement more clear warning/error when using constraints on compound types', 'url': 'https://github.com/pydantic/pydantic/issues/8362'}}, {'node': {'title': 'Magic validation methods needs to be documented for version 2', 'url': 'https://github.com/pydantic/pydantic/issues/8374'}}, {'node': {'title': 'More consistent and intuitive `alias` behavior for validation and serialization', 'url': 'https://github.com/pydantic/pydantic/issues/8379'}}, {'node': {'title': 'Common mistakes docs section', 'url': 'https://github.com/pydantic/pydantic/issues/8380'}}, {'node': {'title': 'Deprecate `update_json_schema` function', 'url': 'https://github.com/pydantic/pydantic/issues/8381'}}, {'node': {'title': 'coerce_numbers_to_str needs a per-field variant', 'url': 'https://github.com/pydantic/pydantic/issues/8383'}}, {'node': {'title': 'Bus error when using custom type impl with SQLAlchemy ', 'url': 'https://github.com/pydantic/pydantic/issues/8385'}}, {'node': {'title': 'Allow optional properties to be truely optional', 'url': 'https://github.com/pydantic/pydantic/issues/8394'}}, {'node': {'title': '`@property` changes behavior when using dataclass-ish classes', 'url': 'https://github.com/pydantic/pydantic/issues/8401'}}, {'node': {'title': "Regex's pattern is not serialized when creating a model's JSON schema", 'url': 'https://github.com/pydantic/pydantic/issues/8405'}}, {'node': {'title': 'cached_property is not ignored when a model is copied and updated', 'url': 'https://github.com/pydantic/pydantic/issues/8406'}}, {'node': {'title': 'BaseModel causes information loss with Generic classes', 'url': 'https://github.com/pydantic/pydantic/issues/8410'}}, {'node': {'title': "JSON Schema is wrong when `mode='serialization'` and fields have a default", 'url': 'https://github.com/pydantic/pydantic/issues/8413'}}, {'node': {'title': 'Bytes and Bits Conversion Type', 'url': 'https://github.com/pydantic/pydantic/issues/8415'}}, {'node': {'title': 'Debian/ubuntu packages for v2', 'url': 'https://github.com/pydantic/pydantic/issues/8416'}}, {'node': {'title': "Union discriminator tag fails if '_' is in the literal value", 'url': 'https://github.com/pydantic/pydantic/issues/8417'}}]}}}

Пробежимся по issues и принтанем все title. Код будет примерно таким

for issue in result["repository"]["issues"]["edges"]:
    print(issue["node"]["title"])

Не очень удобно. Хотелось бы иметь подсказки от IDE и писать что-то типа

for issue in repository.issues.edges:
    print(issue.node.title)

Для этого можно создать классы, повторяющие типы из GraphQL схемы. Есть два вопроса

  • какой базовый класс использовать?

  • как, имея схему, генерить эти классы?

Базовый класс

Первый вариант — dataclasses.dataclass. Это хорошо, но возникает проблема при десериализации вложенных типов. Альтернативный вариант dataclasses-json. Это дата-классы дополненные методами from_json, from_dict. Некоторые проблемы при таком подходе

  • необходы свои сериализаторы/десериализаторы для типа union (в новых версиях проблема может быть решена);

  • преобразование объекта с глубокой вложенность может занимать значительное время;

Современный подход, решающий описанные проблемы — pydantic. Рассмотрим наш пример (не забывая добавить pydantic в проект)

from pydantic import BaseModel

class Issue(BaseModel):
    title: str
    url: str

class IssueEdge(BaseModel):
    node: Issue

class IssueConnection(BaseModel):
    edges: list[IssueEdge]

class Repository(BaseModel):
    issues: IssueConnection

repository = Repository.model_validate(result["repository"])

for issue in repository.issues.edges:
    print(issue.node.title)

Результат

Can't use config keyword argument with TypeAdapter.__init__ on stdlib dataclass
Constructor for model with `Json[list[int]]` field should accept `list[int]`, like mypy already expects
JSON serialization issue with ipaddress classes as alternative
Indeterminate: LookupError when generating custom schema output
Implement more clear warning/error when using constraints on compound types
Magic validation methods needs to be documented for version 2
More consistent and intuitive `alias` behavior for validation and serialization
Common mistakes docs section
Deprecate `update_json_schema` function
coerce_numbers_to_str needs a per-field variant
Bus error when using custom type impl with SQLAlchemy 
Allow optional properties to be truely optional
`@property` changes behavior when using dataclass-ish classes
Regex's pattern is not serialized when creating a model's JSON schema
cached_property is not ignored when a model is copied and updated
BaseModel causes information loss with Generic classes
JSON Schema is wrong when `mode='serialization'` and fields have a default
Bytes and Bits Conversion Type
Debian/ubuntu packages for v2
Union discriminator tag fails if '_' is in the literal value

Генерация классов

Чтобы вручную создать все классы, описанные в схеме github GraphQL API, может потребоваться много времени и концентрации. Если же схема часто изменяется (что может быть на начальных этапах разработки) — нужно выполнять много рутинной работы по актуализации. Удобно иметь возможность генерить классы автоматически при изменении схемы.

Описанная проблема привела к созданию библиотеки graphql2python для генерации pydantic модели данных по некоторой GraphQL смехе. Недавно в бета режиме эти наработки перенесены в библиотеку datamodel-code-generator — известную библиотеку для генерации модели данных по Open API схеме.

Запустим datamodel-code-generator для нашей схемы

$ poetry add "datamodel-code-generator[graphql]"
$ datamodel-codegen --input tests/data/schema.docs.graphql --input-file-type graphql --output github_graphql_client/model.py

Классы из примера выше в формате datamodel-code-generator будут выглядеть следующим образом

# `github_graphql_client/model.py` file
...

class Issue(
    Assignable,
    Closable,
    Comment,
    Deletable,
    Labelable,
    Lockable,
    Node,
    ProjectV2Owner,
    Reactable,
    RepositoryNode,
    Subscribable,
    SubscribableThread,
    UniformResourceLocatable,
    Updatable,
    UpdatableComment,
):
    """
    An Issue is a place to discuss ideas, enhancements, tasks, and bugs for a project.
    """

    ...

...

class IssueEdge(BaseModel):
    """
    An edge in a connection.
    """

    cursor: String
    node: Optional[Issue] = None
    typename__: Optional[Literal['IssueEdge']] = Field(
        'IssueEdge', alias='__typename'
    )

...

class IssueConnection(BaseModel):
    """
    The connection type for Issue.
    """

    edges: Optional[List[Optional[IssueEdge]]] = Field(default_factory=list)
    nodes: Optional[List[Optional[Issue]]] = Field(default_factory=list)
    pageInfo: PageInfo
    totalCount: Int
    typename__: Optional[Literal['IssueConnection']] = Field(
        'IssueConnection', alias='__typename'
    )

...

class Repository(
    Node,
    PackageOwner,
    ProjectOwner,
    ProjectV2Recent,
    RepositoryInfo,
    Starrable,
    Subscribable,
    UniformResourceLocatable,
):
    """
    A repository contains the content for a project.
    """
    
    ...

...

Пакет datamodel-code-generator имеет гибкую настройку результирующей модели данных. Например, можно указать

  • версию pydantic;

  • версию python;

  • альтернативный класс (например, dataclasses.dataclass);

  • имена полей для некоторых типов;

  • свои кастомные шаблоны (написанные так же на jinja2);

Подробнее со всеми возможностями datamodel-code-generator можно ознакомиться в документации.

Заключение

Мы написали клиент, который уже может приносить пользу при решении некоторых практических задач. Но все равно остался ряд тем, которые вышли за рамками данной статьи. Например,

  • кэширование на стороне клиента;

  • сервер отклоняет запрос из-за большого количества токенов в нем;

  • поддержка кастомных скаляров;

  • работа клиента через websockets;

  • что делать, если нужно одновременно совершить несколько сотен/тысяч однотипных мутаций?

Индустрия так же не стоит на месте. Наиболее актуальные инструменты для GraphQL клиентов можно посмотреть тут: https://graphql.org/code/#python-client. Например, стоит обратить внимание на молодой инструмент ariadne-codegen для генерации типизированного GraphQL клиента по вашим схемам и запросам.

Буду рад услышать ваш опыт работы с GraphQL клиентами на python в комментариях. Возможно, вы решали проблему, которая не упоминается в данной статье и можете о ней рассказать.

Спасибо за внимание!

Комментарии (2)


  1. Virviil
    25.12.2023 09:25

    Исходя из моего опыта: если вы делаете межсервисное взаимодействие через graphql - вы делаете что-то не то. Более того: если ОБА сервиса - ваши, а не third party.


    1. denisart Автор
      25.12.2023 09:25

      Спасибо за комментарий.

      У нас довольно сложная и объемная модель данных и большое количество клиентов этих данных. Удобно использовать преимущество GraphQL -- каждому получать именно те данные, которые требуются.

      Так же, у нас некий аналог DDD подхода, а GraphQL служит схемой.

      В нашей команде мы пришли к выводу, что не прогодали с GraphQL, но как верно вы заметили -- это не всегда удачное решение, которое к тому же имеет ряд проблем.
      Вот статья про одну из них внутри нашего проекта: https://habr.com/ru/articles/707648/