Однажды тимлид поставил передо мной задачу реализовать механизм взаимодействия пользователя через веб-интерфейс с микросервисами через единую точку входа с использованием FastAPI и RabbitMQ. Спешу поделиться с тобой, мой читатель, тем, что у меня получилось. По мере повествования дам пояснения по представленному коду. И, да, сделаю интересные отступления по вопросам валидации и хранения, в т.ч. приватных, данных.

Задача

Пользователь через единый веб-интерфейс («Front» side) должен обращаться к микросервисной архитектуре, расположенной в закрытом контуре («Back» side). Взаимодействие должно быть построено посредством очередей RabbitMQ. Все работает под управлением Python, FastAPI и Gunicorn. В графическом виде получается что‑то такое:

Принципиальная схема взаимодействия объектов архитектуры
Принципиальная схема взаимодействия объектов архитектуры

Планирование

API-шлюз будет принимать запрос от пользователя и исходя из частностей поставленной задачи интерпретировать. Далее будет отправлять в определенном формате сообщение в очередь RabbitMQ и переходить в режим «ожидания». С другой стороны RabbitMQ так же в режиме ожидания сообщений будет работать RPC-сервер. При получении запроса он определит, в какое из приложений его перенаправить, запулит его в сторону соответствующего Gunicorn, который распределит нагрузку между экземплярами микросервиса и полученный ответ вернет RPC-серверу, а тот в свою очередь прокинет его дальше в специально созданную очередь. С той стороны «ждущий» шлюз получит это сообщение, выйдет из режима ожидания и сформированный ответ вернет пользователю.

Исполнение

Объединим все модули реализуемой схемы в одном проекте, подразумевая при этом, что у каждой составляющей может быть свое хранилище, виртуальное пространство, способы и команды запуска и т.д. Сервисы будут простые простейшие, но достаточные для представления и понимания общей концепции. Полный исходный код в моем репозитории.

Создадим корневой каталог и перейдем в него:

mkdir api-gateway && cd api-gateway

Сформируем файл с основными используемыми библиотеками и назовем его традиционно requirements.txt, остальные подтянутся зависимостями:

echo "
aiohttp==3.8.3
aio-pika==8.2.5
fastapi==0.88.0
gunicorn==20.1.0
path==16.5.0
PyYAML==6.0
pyyaml-tags==1.0.4
six==1.16.0
SQLAlchemy==1.4.45
uvicorn==0.20.0
" > requirements.txt

Создадим виртуальное окружение в каталоге.venv3.10 и активируем его. Кстати, я использую Python 3.10.6 под Ubuntu 20.04:

python3 -m venv .venv3.10 && source .venv3.10/bin/activate

Здесь и далее все скрипты будем запускать под этой средой.

Также установим RabbitMQ. Я предпочитаю разворачивать его в Docker‑контейнере. Хорошая подробная инструкция по установке здесь.

Начнем с... Конца.:‑)

1. Сервис

Создадим каталог первого микросервиса и перейдем в него:

mkdir srv_clients && cd srv_clients

Создадим структуру файлов сервиса так же одной командой:

touch __init__.py main.py schemas.py database.py models.py querysets.py

__init__.pyслужебный файл пакета python будет пуст

database.py — инициализирует подключение к БД

Код
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker

engine = create_async_engine("sqlite+aiosqlite:///clients.sqlite")
sm = sessionmaker(
    engine, autocommit=False, autoflush=False, class_=AsyncSession
)

Base = declarative_base()

Для работы используем асинхронные сессии и SQLite в качестве СУБД.

models.py — реализует модели структур данных

Код
from sqlalchemy import Column, Integer, Text
from sqlalchemy.orm.collections import InstrumentedList

from srv_clients.database import Base


class BaseModel(Base):
    __abstract__ = True
    id = Column(Integer, primary_key=True, comment="Идентификатор")

    def to_dict(self) -> dict:
        data = dict()
        for k, v in self.__dict__.items():
            if k.startswith("_"):
                continue
            elif isinstance(v, Base):
                v = v.to_dict()
            elif isinstance(v, InstrumentedList):
                v = [item.to_dict() for item in v]
            data[k] = v
        return data

Здесь мы создали базовую абстрактную модель. В ней объявлено поле уникального идентификатора, являющееся также первичным ключом. Кроме того, реализовали простейший рекурсивный метод to_dict для преобразования экземпляра класса в словарь. Да, он не охватывает все ситуации и может вызывать кучу ошибок и требует более детальной проработки, но для примера его более чем достаточно.

class Client(BaseModel):
    __tablename__ = "client"
    __table_args = {"comment": "Таблица клиентов"}

    surname = Column(Text(length=255), nullable=False, comment="Фамилия")
    name = Column(Text(length=255), nullable=False, comment="Имя")
    country = Column(Text(length=255), comment="Страна")

Создадим унаследованную от BaseModel модель Клиента, добавим к ней комментарий и несколько текстовых полей длиной до 255 символов так же с пояснениями.

querysets.py — содержит наборы функций для работы с данными

Код
from typing import List

from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession

from srv_clients.models import Client


class ClientQueryset:
    model = Client

    @classmethod
    async def create(cls, session: AsyncSession, **kwargs) -> int:
        created = cls.model(**kwargs)
        session.add(created)
        await session.flush([created])
        return created.id

Метод создания записи будет возвращать идентификатор созданной записи. Так как идентификатор является автоинкрементом и в простейших условиях генерируется только после закрытия транзакции, принудительно вызовем метод сессии flush, который сбросит данные в БД и атрибуту id присвоится значение.

    @classmethod
    async def get_by_id(cls, session: AsyncSession, id_: int) -> Client:
        return await session.scalar(
            select(cls.model).where(cls.model.id == id_)
        )

    @classmethod
    async def get_multiple(
        cls,
        session: AsyncSession,
        limit: int = None,
        offset: int = None,
        **filters,
    ) -> List[Client]:
        where = list()
        if surname := filters.get("surname"):
            where.append(cls.model.surname == surname)
        if name := filters.get("name"):
            where.append(cls.model.name == name)
        if country := filters.get("country"):
            where.append(cls.model.country == country)
        return await session.scalars(
            select(cls.model).where(*where).limit(limit).offset(offset)
        )

Для получения клиента по идентификатору и отфильтрованного списка записей используем методы сессии scalar и scalars соответственно. Условия фильтрации ожидаем в виде динамических именованных параметров. Используем простое сравнение полей и добавим возможность реализации пагинации с помощью параметров limit и offset.

    @classmethod
    async def update(cls, session: AsyncSession, id_: int, **kwargs) -> None:
        await session.execute(
            update(cls.model).where(cls.model.id == id_).values(**kwargs)
        )

    @classmethod
    async def delete(cls, session: AsyncSession, id_: int) -> None:
        await session.execute(delete(cls.model).where(cls.model.id == id_))

Все методы обернуты в декоратор @classmethod для возможности вызова без создания экземпляра класса.

schemas.py — описывает входные/выходные схемы данных.

Код
from pydantic import BaseModel, Field


class ClientSchema(BaseModel):
    surname: str = Field(..., description="Фамилия")
    name: str = Field(..., description="Имя")
    country: str = Field(..., description="Страна")


class ClientViewSchema(ClientSchema):
    id: int = Field(..., description="Идентификатор")

Все атрибуты сделаем обязательными (в качестве значения по умолчанию передадим тип ellipsis — ...). К модели просмотра добавим атрибут идентификатора.

main.py — содержит исполняемый код микросервиса.

Код
from typing import List

from fastapi import FastAPI, HTTPException, Request
from fastapi.params import Query
from starlette import status

from srv_clients.database import Base, engine
from srv_clients.database import sm as session_maker
from srv_clients.querysets import ClientQueryset
from srv_clients.schemas import ClientSchema, ClientViewSchema

app = FastAPI()
app.state.engine = engine
app.state.session_maker = session_maker

Создаем приложение, добавляем к нему в качестве атрибутов подключение к базе данных и инициализатор сессий. Делаем это для того, чтобы иметь возможность обращаться к ним внутри эндпоинтов через экземплярRequest.

@app.on_event("startup")
async def on_startup():
    async with app.state.engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)


@app.on_event("shutdown")
async def on_shutdown():
    # async with app.state.engine.begin() as conn:
    #     await conn.run_sync(Base.metadata.drop_all)
    await app.state.engine.dispose()

Добавим два обработчика событий приложения:

  • startup— переводим модели SQLAlchemy в таблицы БД;

  • shutdown— очищаем БД путем удаления всех таблиц (закомментировано) и корректно завершаем соединение с БД.

@app.get("/clients", response_model=List[ClientViewSchema])
async def get_filtered_clients(
    request: Request,
    surname: str = Query(None, description="Фамилия"),
    name: str = Query(None, description="Имя"),
    country: str = Query(None, description="Страна"),
    limit: int = Query(
        None, description="Количество записей на странице результатов"
    ),
    offset: int = Query(
        None,
        description="Смещение страницы результатов относительно первой записи",
    ),
):
    async with request.app.state.session_maker() as session:
        rows = await ClientQueryset.get_multiple(
            session,
            surname=surname,
            name=name,
            country=country,
            limit=limit,
            offset=offset,
        )
        return [row.to_dict() for row in rows.all()]


@app.post(
    "/clients",
    response_model=ClientViewSchema,
    status_code=status.HTTP_201_CREATED,
)
async def create_client(request: Request, data: ClientSchema):
    sm = request.app.state.session_maker
    async with sm.begin() as session:
        id_ = await ClientQueryset.create(session, **data.dict())
    async with sm() as session:
        client = await ClientQueryset.get_by_id(session, id_)
        return client.to_dict()


@app.get("/clients/{client_id}", response_model=ClientViewSchema)
async def get_client(request: Request, client_id: int):
    async with request.app.state.session_maker() as session:
        client = await ClientQueryset.get_by_id(session, client_id)
        if not client:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail="Client not found",
            )
        return client.to_dict()


@app.patch(
    "/clients/{client_id}",
    response_model=ClientViewSchema,
    status_code=status.HTTP_202_ACCEPTED,
)
async def update_client(request: Request, client_id: int, data: ClientSchema):
    sm = request.app.state.session_maker
    async with sm.begin() as session:
        client = await ClientQueryset.get_by_id(session, client_id)
        if not client:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND, detail="Client not found"
            )
        await ClientQueryset.update(session, client_id, **data.dict())

    async with sm() as session:
        client = await ClientQueryset.get_by_id(session, client_id)
        return client.to_dict()


@app.delete(
    "/clients/{client_id}",
    status_code=status.HTTP_202_ACCEPTED,
)
async def delete_client(request: Request, client_id: int):
    sm = request.app.state.session_maker
    async with sm.begin() as session:
        client = await ClientQueryset.get_by_id(session, client_id)
        if not client:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
            )
        await ClientQueryset.delete(session, client_id)

Реализуем здесь простой CRUD с транзакциями. Для операций, вносящих изменения в БД, откроем транзакцию (создадим точку сохранения — savepoint) с помощью метода begin и менеджера контекста, при выходе из которого она будет успешно закрыта (под капотом будет вызван оператор commit). В случае возникновения каких‑либо ошибок внутри блока with транзакция завершится откатом (будет вызван оператор rollback).

Для разнообразия добавим фильтрацию —get_filtered_clients.

Для случаев, когда запись из БД обязательно должна быть получена (например, при изменении или удалении), выбросим исключение HTTPException когда она не найдена.

Для преобразования ответа в экземпляр схемы, передаваемой в параметре response_modelэндпоинта, или их список, реализующая его функция должна возвращать соответственно словарь или список словарей. Тут нам и пригодится метод to_dict базовой модели данных.

Можно запускать! Напишем команду:

python -m gunicorn --bind 0.0.0.0:8001 --workers 4 -k uvicorn.workers.UvicornWorker main:app

Здесь мы указываем, что сервис будет работать по адресу 0.0.0.0 и на 8001 порте, будет запущено 4 воркера класса UvicornWorker (необязательно — документация в первоисточнике). Приложение инициализируется в переменную app модуля main (main:app).

Имеем примерно такой вывод:

[2023-01-11 21:02:00 +0300] [29518] [INFO] Starting gunicorn 20.1.0
[2023-01-11 21:02:00 +0300] [29518] [INFO] Listening at: http://0.0.0.0:8001 (29518)
[2023-01-11 21:02:00 +0300] [29518] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2023-01-11 21:02:00 +0300] [29520] [INFO] Booting worker with pid: 29520
[2023-01-11 21:02:00 +0300] [29521] [INFO] Booting worker with pid: 29521
[2023-01-11 21:02:00 +0300] [29522] [INFO] Booting worker with pid: 29522
[2023-01-11 21:02:00 +0300] [29523] [INFO] Booting worker with pid: 29523
[2023-01-11 21:02:01 +0300] [29521] [INFO] Started server process [29521]
[2023-01-11 21:02:01 +0300] [29521] [INFO] Waiting for application startup.
[2023-01-11 21:02:01 +0300] [29523] [INFO] Started server process [29523]
[2023-01-11 21:02:01 +0300] [29523] [INFO] Waiting for application startup.
[2023-01-11 21:02:01 +0300] [29520] [INFO] Started server process [29520]
[2023-01-11 21:02:01 +0300] [29520] [INFO] Waiting for application startup.
[2023-01-11 21:02:01 +0300] [29522] [INFO] Started server process [29522]
[2023-01-11 21:02:01 +0300] [29522] [INFO] Waiting for application startup.
[2023-01-11 21:02:01 +0300] [29521] [INFO] Application startup complete.
[2023-01-11 21:02:01 +0300] [29522] [INFO] Application startup complete.
[2023-01-11 21:02:01 +0300] [29523] [INFO] Application startup complete.
[2023-01-11 21:02:01 +0300] [29520] [INFO] Application startup complete.

2. RPC-сервер

Реализуем функционал, который будет получать запросы из очереди, перенаправлять их в соответствующий сервис и возвращать обратно в эту же очередь. На этом этапе осветим два дополнительных вопроса: работу с переменными окружения, в которых будем хранить пароль для подключения к очереди, и собственно работу с очередью RabbitMQ. Кстати, здесь есть хорошая статья о его основных понятия.

Начнем с создания структуры пакета. Вернемся в корень проекта, создадим каталог rpc сервера и перейдем в него:

cd .. && mkdir rpc && cd rpc

Создадим структуру файлов так же одной командой:

touch __init__.py common.py resolver.py server.py

Вложим сюда каталог с настройками и .env файлом:

mkdir config && touch config/.env config/local.yaml

Далее так же по порядку.

__init__.py— служебный файл пакета python будет пуст.

common.py — общие функции работы сервера.

Код

Здесь отступление от первичной цели — хранение и получение настроек в yaml‑файле, а также обращение к переменным окружения из конфигурационного файла. Для этого используем библиотеку PyYaml‑Tags. Пока просто объявим класс и переопределим приватный метод _from_yaml. Вызов будет чуть ниже.

import os
from pathlib import Path

import yaml
from dotenv import load_dotenv
from yaml_tags import BaseTag, tag_registry


@tag_registry.register("env_tag")
class EnvTag(BaseTag):
    def _from_yaml(
        self,
        _loader,
        _work_dir,
        _prefix,
        _suffix,
        param=None,
        *args,
        **kwargs,
    ):
        result = os.environ.get(param, "")
        return f"{_prefix}{result}{_suffix}"

Регистрируем свой тэг env_tag и метод его и формирования.

def load_config(config_path: Path) -> dict:
    tag_registry.require("env_tag")
    env_path = f"{config_path.absolute().parent}/.env"
    load_dotenv(dotenv_path=env_path)
    with open(config_path) as f:
        return yaml.load(f, Loader=yaml.Loader)


def get_config_path(file_name: str) -> Path:
    current_dir = Path(__file__).absolute().parent
    return current_dir / "config" / file_name

Тут мы считаем .env файл. Будем ожидать его в каталоге с конфигурационным(‑ми) файлом(‑ами).

local.yaml:

rmq:
  url: amqp://gateway:<% env_tag(param="RMQ_PASS") %>@127.0.0.1:5672/gateway

В данном случае храним только строку подключения к RabbitMQ.

.env:

RMQ_PASS=gateway

Обращение к зарегистрированному тэгу простое: <% [имя_тэга]([имя_именованного_параметра]=[имя_переменной_окружения]) %>

resolver.py— хранит функции распределения запросов между сервисам.

Код
from typing import Tuple

import aiohttp

SERVICE_MAP = {
    "clients": "0.0.0.0:8001",
    "goods": "0.0.0.0:8002",
}


async def resolve(
    method: str,
    path: str,
    params: dict = None,
    data: dict = None,
    headers: dict = None,
) -> Tuple[dict, int]:
    """
    Функция определяет по первой части url-запроса к какому сервису 
    происходит обращение, перенаправляет в него запрос и возвращает 
    ответ в виде словаря и код статуса.
    """
    _, service_name, *_ = path.split("/")
    service_host = SERVICE_MAP[service_name]
    url = f"http://{service_host}{path}"
    response, status_code = await make_request(
        url, method, params, data, headers
    )
    return response, status_code


async def make_request(
    url: str,
    method: str,
    params: dict = None,
    data: dict = None,
    headers: dict = None,
) -> Tuple[dict, int]:
    """
    Выполнение запроса, тип которого определяется параметром method,
    по url-адресу. Возвращает ответ в виде словаря и код статуса.
    """
    
    if not headers:
        headers = {}
    if not data:
        data = {}

    async with aiohttp.ClientSession() as session:
        request = getattr(session, method)
        async with await request(
            url, params=params, json=data, headers=headers
        ) as response:
            response_json = await response.json()
            return response_json, response.status

server.py— исполняемый код RPC‑сервера.

Код
import asyncio

from aio_pika import connect_robust
from aio_pika.patterns import RPC

from common import get_config_path, load_config
from resolver import resolve


async def server() -> None:
    config = load_config(get_config_path("local.yaml"))
    rmq = config["rmq"]

    connection = await connect_robust(rmq["url"])

    async with connection:
        channel = await connection.channel()
        rpc = await RPC.create(channel)
        await rpc.register(resolve.__name__, resolve, auto_delete=True)
    
        try:
            await asyncio.Future()
        finally:
            await connection.close()


if __name__ == "__main__":
    asyncio.run(server())

Здесь все совсем просто. Считываем конфигурационный файл, преобразовываем его в словарь и получаем настройки подключения к очереди RabbitMQ. Создаем это подключение, канал и регистрируем обработчик. Кстати, в пакете aio_pika есть и другие шаблоны, кроме RPC.

Тут также все готово. Можно запускать:

python server.py

Сервер можно распараллелить, просто выполнив команду запуска необходимое количество раз в разных терминалах. Запросы будут автоматически последовательно распределяться между каналами очередей.

Заглянув в графический интерфейс RabbtiMQ, увидим что создается один обменник rpc.dlx:

Вкладка Exchanges
Вкладка Exchanges

И две очереди:

Вкладка Queues
Вкладка Queues

Схема работы будет следующая:

Схема работы RPC посредством RabbitMQ
Схема работы RPC посредством RabbitMQ
  • Когда клиент запускается, он создает анонимную эксклюзивную очередь обратного вызова (в нашем случае amq_0x4e0bc6f2bc4e3df862a47b2b772bec3d);

  • Для запроса RPC клиент отправляет сообщение с двумя свойствами: answer_to, для которого задана очередь обратного вызова, и corellation_id, для которого установлено уникальное значение для каждого запроса;

  • Запрос отправляется в очередь resolve;

  • RPC‑сервер ожидает запросов в этой очереди. Когда появляется запрос, он выполняет работу и отправляет сообщение с результатом обратно Клиенту, используя очередь из поля reply_to;

  • Клиент ожидает данных в очереди обратного вызова. Когда появляется сообщение, оно проверяет свойство corellation_id . Если он соответствует значению из запроса, он возвращает ответ приложению.

3. Gateway

Наш шлюз должен повторять структуру конечных точек всех обслуживаемых сервисов, а также схем входных и выходных данных. Поэтому выделим для каждого сервиса отдельный каталог.

Начнем с создания структуры. Вернемся в корень проекта, создадим каталог gateway и перейдем в него:

cd .. && mkdir gateway && cd gateway

Создадим структуру файлов и каталогов так же одной командой:

touch __init__.py rabbit.py common.py main.py && mkdir config && touch config/.env config/local.yaml && mkdir srv_clients && touch srv_clients/__init__.py srv_clients/routes.py srv_clients/schemas.py

Далее снова по порядку.

__init__.py — служебный файл пакета python традиционно пуст.

rabbit.py — функции работы с очередями RabbitMQ.

Код
from typing import Tuple

from aio_pika import connect_robust
from aio_pika.patterns import RPC


async def send_request_to_queue(
    config: dict, message: dict
) -> Tuple[dict, int]:
    rmq = config["rmq"]
    connection = await connect_robust(rmq["url"])

    async with connection:
        channel = await connection.channel()
        rpc = await RPC.create(channel)
        result, status_code = await rpc.proxy.resolve(**message)
        return result, status_code

Из настроек получаем конфигурацию для работы с очередью и открываем соединение. Обозначаем, что хотим вызвать функцию resolve. Она, как известно, возвращает словарь и числовой код статуса.

common.py — общие функции работы сервера.

Код

По большей части он будет повторять код rpc‑сервера с одним дополнением — функцией router.

import os
from functools import wraps
from pathlib import Path
from typing import Any, Optional

import yaml
from dotenv import load_dotenv
from fastapi import HTTPException, Request, Response
from starlette import status
from yaml_tags import BaseTag, tag_registry

from gateway.rabbit import send_request_to_queue


@tag_registry.register("env_tag")
class EnvTag(BaseTag):
    def _from_yaml(
        self,
        _loader,
        _work_dir,
        _prefix,
        _suffix,
        param=None,
        *args,
        **kwargs,
    ) -> str:
        result = os.environ.get(param, "")
        return f"{_prefix}{result}{_suffix}"


def load_config(config_path: Path) -> dict:
    tag_registry.require("env_tag")
    env_path = f"{config_path.absolute().parent}/.env"
    load_dotenv(dotenv_path=env_path)
    with open(config_path) as f:
        return yaml.load(f, Loader=yaml.Loader)


def get_config_path(file_name: str) -> Path:
    current_dir = Path(__file__).absolute().parent
    return current_dir / "config" / file_name


def router(
    method,
    path: str,
    data_key: Optional[str] = None,
    status_code: Optional[int] = status.HTTP_200_OK,
    response_model: Optional[Any] = None,
):
    """
    Обертка функции эндпоинта, реализующая обращение к RPC-серверу.

    :param method: Вызываемый объект (функция) реализующая тот или 
                   иной метод http-запроса.
    :param path: Адрес эндпоинта.
    :param data_key: Имя ключа, в котором передаются данные 
                     на эндпоинт.
    :param status_code: Ожидаемый код ответа сервера.
    :param response_model: Модель данных для преобразования ответа.
    """
    app_method = method(
        path, status_code=status_code, response_model=response_model
    )

    def wrapper(endpoint):
        @app_method
        @wraps(endpoint)
        async def decorator(request: Request, response: Response, **kwargs):
            request_method = request.scope["method"].lower()
            data = kwargs.get(data_key)
            data = data.dict() if data else {}
            response_data, response_status_code = await send_request_to_queue(
                config=request.app.config,
                message={
                    "method": request_method,
                    "path": request.scope["path"],
                    "params": dict(request.query_params),
                    "data": data,
                    "headers": {},
                },
            )
            if response_status_code >= status.HTTP_400_BAD_REQUEST:
                raise HTTPException(
                    status_code=response_status_code, detail=response_data
                )
            response.status_code = response_status_code
            return response_data

    return wrapper

Функция router реализует декоратор, который в свою очередь традиционным методом оборачивает эндпоинт в параметризированный декоратор. Из переданных параметров собирается объект сообщения для направления в один из сервисов. Сообщение направляется в очередь и ожидается ответ. Если код ответа информирует об ошибке (>= 400), выбрасываем исключение с полученным статусом и сообщением. В случае успеха, подменяем статус объекта response кодом, полученным от сервиса.

На данном этапе функционал можно дополнить, например, проверкой учетных данных, каким‑то логированием и всяческими плюшками. Для примера ограничимся простыми запросом‑ответом.

main.py — исполняемый код шлюза.

Код
from fastapi import FastAPI

from common import get_config_path, load_config
from gateway.srv_clients.routes import clients_router

app = FastAPI()
app.config = load_config(get_config_path("local.yaml"))
app.include_router(clients_router)

Здесь мы инициализируем приложение, добавляем к нему в качестве атрибута config словарь с настройками. Делаем это опять же для того, чтобы иметь возможность обращаться к ним внутри эндпоинтов через экземплярRequest, а именно в декораторе route.

Как я говорил выше, описываем сервисы, частично повторяя их структуру.

routes.py — конечные точки входа в сервис (endpoints).

Код
from typing import List

from fastapi import APIRouter, Response
from fastapi.params import Query
from starlette import status
from starlette.requests import Request

from gateway.common import router
from gateway.srv_clients.schemas import ClientSchema, ClientViewSchema

clients_router = APIRouter(prefix="/clients")


@router(
    method=clients_router.get,
    path="",
    response_model=List[ClientViewSchema],
)
async def get_filtered_clients(
    request: Request,
    response: Response,
    surname: str = Query(None, description="Фамилия"),
    name: str = Query(None, description="Имя"),
    country: str = Query(None, description="Страна"),
    limit: int = Query(
        None, description="Количество записей на странице результатов"
    ),
    offset: int = Query(
        None,
        description="Смещение страницы результатов относительно первой записи",
    ),
):
    pass


@router(
    method=clients_router.post,
    path="",
    response_model=ClientViewSchema,
    status_code=status.HTTP_201_CREATED,
    data_key="data",
)
async def create_client(
    request: Request, response: Response, data: ClientSchema
):
    pass


@router(
    method=clients_router.get,
    path="/{client_id}",
    response_model=ClientViewSchema,
)
async def get_client(request: Request, response: Response, client_id: int):
    pass


@router(
    method=clients_router.patch,
    path="/{client_id}",
    response_model=ClientViewSchema,
    status_code=status.HTTP_202_ACCEPTED,
    data_key="data",
)
async def update_client(
    request: Request, response: Response, client_id: int, data: ClientSchema
):
    pass


@router(
    method=clients_router.delete,
    path="/{client_id}",
    status_code=status.HTTP_202_ACCEPTED,
)
async def delete_client(request: Request, response: Response, client_id: int):
    pass

Все эндпоинты не содержат код. Запросы выполняются внутри декоратора, в который мы оборачиваем функцию.

schemas.py — Схемы входных и выходных данных.

Код
from pydantic import BaseModel, Field


class ClientSchema(BaseModel):
    surname: str = Field(..., description="Фамилия")
    name: str = Field(..., description="Имя")
    country: str = Field(..., description="Страна")


class ClientViewSchema(ClientSchema):
    id: int = Field(..., description="Идентификатор")

Здесь полная копия схем сервиса.

Готово! Запустим шлюз на 8000-м порте:

python -m gunicorn --bind 0.0.0.0:8000 --workers 4 -k uvicorn.workers.UvicornWorker main:app

Пробы

Вся связка готова. Можно запускать все вместе и применять функционал на практике.

Создадим клиента:

POST http://localhost:8000/clients
Content-Type: application/json

{
  "surname": "John",
  "name": "Malkovitch",
  "country": "USA"
}

И получим ответ:

HTTP/1.1 201 Created
date: Mon, 06 Feb 2023 10:43:10 GMT
server: uvicorn
content-length: 61
content-type: application/json

{
  "surname": "John",
  "name": "Malkovitch",
  "country": "USA",
  "id": 1
}

Запросим список клиентов:

GET http://localhost:8000/clients
Content-Type: application/json

Результат будет таким:

HTTP/1.1 200 OK
date: Mon, 06 Feb 2023 10:45:12 GMT
server: uvicorn
content-length: 63
content-type: application/json

[
  {
    "surname": "John",
    "name": "Malkovitch",
    "country": "USA",
    "id": 1
  }
]

Ожидаемо получаем список json‑объектов.

Так же запросим запись по идентификатору:

GET http://localhost:8000/clients/1
Content-Type: application/json

Результат будет таким:

HTTP/1.1 200 OK
date: Mon, 06 Feb 2023 10:48:26 GMT
server: uvicorn
content-length: 61
content-type: application/json

{
  "surname": "John",
  "name": "Malkovitch",
  "country": "USA",
  "id": 1
}

Изменим нашего Джона Малковича из США на Джона Смита из Британии:

PATCH http://localhost:8000/clients/1
Content-Type: application/json

{
  "surname": "John",
  "name": "Smith",
  "country": "Great Britain"
}

В ответ получаем измененную запись:

HTTP/1.1 202 Accepted
date: Mon, 06 Feb 2023 10:52:02 GMT
server: uvicorn
content-length: 66
content-type: application/json

{
  "surname": "John",
  "name": "Smith",
  "country": "Great Britain",
  "id": 1
}

А теперь удалим его:

DELETE http://localhost:8000/clients/1
Content-Type: application/json

В ответе теперь нас интересует лишь статус и он 202-й:

HTTP/1.1 202 Accepted
date: Mon, 06 Feb 2023 10:52:37 GMT
server: uvicorn
content-length: 4
content-type: application/json

null

Вот и всё. Проделано много работы и написано много кода. RPC достаточно старая, но хорошо себя зарекомендовавшая технология. Данный вариант взаимодействия пользователя с сервисами не ультимативен и может быть реализован с использованием иных технологий. Спасибо, что дочитали до конца. До скорых встреч!

Комментарии (19)


  1. SergeyDeryabin
    06.02.2023 16:12
    +2

    А зачем на самой первой картинке нужен RabbitMQ? Почему нельзя сразу на бэк запросы прикидывать?


    1. v_suntsov Автор
      06.02.2023 16:17
      -3

      Самая главная причина изложена в заголовке — такая поставлена задача.


      1. SergeyDeryabin
        06.02.2023 16:19

        Это я понял, зачем через Rabbit все гнать? В чем плюс использования Rabbit вместо проксирования на бэк без очереди?


        1. affinity
          06.02.2023 16:53
          +1

          вероятно если инстансов какого-то сервиса несколько, то что бы не заниматься балансировкой (и вообще не знать сколько их там и в какой момент времени) - проще так. А там кто первый взял сообщение - тот и пашет. Но в статье про это ничего и нет :(


          1. SergeyDeryabin
            06.02.2023 16:54

            Инстансы чего, RPC? Он на схеме один. Инстансы идут уже за RPC и gunicorn


            1. affinity
              06.02.2023 16:56

              Справедливо. На схеме нет вариантов что бы тех же RPC было несколько. Прошу прощения - не внимательность :)


              1. SergeyDeryabin
                06.02.2023 16:59

                Если это сделано для эксперимента и развития, то это одно.

                А если в рабочем проекте - то я вижу только перегрузку архитектуры и дополнительную точку отказа.


                1. ovalsky
                  06.02.2023 21:27

                  Т.е. это "туториал" для эксперимента?


                  1. v_suntsov Автор
                    06.02.2023 21:36

                    Да. Эксперимент. Мой личный. Не на все вопросы я быстро находил ответы в процессе решения в русскоязычном интернете, поэтому решил все собрать воедино в одном месте.


              1. v_suntsov Автор
                06.02.2023 17:00

                На схеме - да, действительно один, но в тексте я указал, что их так же можно запустить n-ое количество.


        1. v_suntsov Автор
          06.02.2023 17:03

          Вопрос изоляции частей архитектуры. Наружу микросервисная часть не смотрит вообще и может быть максимально изолирована. В случае получения доступа за шлюз, доступа к критической инфраструктуре не будет. Ну и да — балансировки нагрузки.


          1. SergeyDeryabin
            06.02.2023 17:08

            Вопрос изоляции частей архитектуры. Наружу микросервисная часть не смотрит вообще и может быть максимально изолирована. В случае получения доступа за шлюз, доступа к критической инфраструктуре не будет.

            Ничего не изменится, если API Gateway будет сам проксировать на RPC

            Ну и да — балансировки нагрузки.

            Балансировать нагрузку может API Gateway, у брокера другая задача


          1. OlegSpectr
            07.02.2023 07:40
            +1

            У вас по сути за всё это и должен отвечать API Gateway. Сейчас же RabbitMQ используется ради того, чтобы был RabbitMQ, каких-то плюсов в такой реализации нет, более того, сильно увеличивается время ответа


      1. andrejbestuzhev
        06.02.2023 17:04
        +4

        Вы всё-таки решаете какую-то проблему и публике очень хочется понять: какую именно? Вместо этого вы предлагаете туториал, как решать неизвестную проблему.


        1. v_suntsov Автор
          06.02.2023 17:05

          Проблема проста - решение поставленной задачи. Мой интерес организовать заявленную архитектуру. В общем-то, статья об этом.


          1. SergeyDeryabin
            06.02.2023 17:17

            Давайте тогда разберем Вашу задачу

            тимлид поставил передо мной задачу реализовать механизм взаимодействия пользователя через веб-интерфейс с микросервисами через единую точку входа с использованием FastAPI и RabbitMQ.

            В задаче не было указано, для каких целей использовать RabbitMQ? Какие будут преимущества от этого? У Вас банковское приложение или биржевое, где необходимо сохранить запросы на случай падения и вернуться после восстановления к их обработке?


            1. v_suntsov Автор
              06.02.2023 17:31

              Вообще, это было тестовое задание. В общих чертах рассказано об архитектуре и предложено реализовать свое видение. Это прототип, лежащий лишь в репозитории GitHub-а. Приложение не банковское, но реализуется для достаточно крупной организации.


              1. andrejbestuzhev
                06.02.2023 17:51
                +4

                У меня бы возникло множество вопросов к такой постановке задачи. Если это какой-то сервис в вакууме - ну ок.
                1. Почему приложению искусственно ограничивают возможность использовать доступные ядра на 100%? (Если gunicorn такое умеет, конечно, я не знаю), т.к. rabbitmq будет передавать сообщения значительно медленнее, чем утилизируются ресурсы процессора.
                2. Почему именно rabbitmq, а не kafka?
                3. Что будет, если сервис за rabbitmq откажет, что получит пользователь и когда?
                4. Что будет в случае отказа rabbitmq?
                Вообще, если мы используем очереди, мы предполагаем, что ответ на запрос поступит "когда-нибудь", а в вашей схеме пользователь отправляет, по всей видимости, синхронный запрос. Это повлечёт очень большие задержки, если кто-то в беке затупит. Следовательно, на фронте тоже должен будет стоять какой-то листенер, который будет обрабатывать "ответы" из очереди - ещё одна точка отказа
                Я понял, что не вы такую схему придумали, но всё же было бы здорово выяснить у того, кто такую задачу поставил: зачем вообще порождать такого монстра?


                1. bondeg
                  09.02.2023 05:03

                  Чтобы говорить "у нас крутая распределенная архитектура и микросервисы", вестимо. Это как в отзывах одного приложения хвастались "увеличили скорость отдачи бонусного баланса с 30 до 15 секунд". Нет, это не опечатка, именно секунд.