Однажды тимлид поставил передо мной задачу реализовать механизм взаимодействия пользователя через веб-интерфейс с микросервисами через единую точку входа с использованием FastAPI и RabbitMQ. Спешу поделиться с тобой, мой читатель, тем, что у меня получилось. По мере повествования дам пояснения по представленному коду. И, да, сделаю интересные отступления по вопросам валидации и хранения, в т.ч. приватных, данных.
Задача
Пользователь через единый веб-интерфейс («Front» side) должен обращаться к микросервисной архитектуре, расположенной в закрытом контуре («Back» side). Взаимодействие должно быть построено посредством очередей RabbitMQ. Все работает под управлением Python, FastAPI и Gunicorn. В графическом виде получается что‑то такое:
Планирование
API-шлюз будет принимать запрос от пользователя и исходя из частностей поставленной задачи интерпретировать. Далее будет отправлять в определенном формате сообщение в очередь RabbitMQ и переходить в режим «ожидания». С другой стороны RabbitMQ так же в режиме ожидания сообщений будет работать RPC-сервер. При получении запроса он определит, в какое из приложений его перенаправить, запулит его в сторону соответствующего Gunicorn, который распределит нагрузку между экземплярами микросервиса и полученный ответ вернет RPC-серверу, а тот в свою очередь прокинет его дальше в специально созданную очередь. С той стороны «ждущий» шлюз получит это сообщение, выйдет из режима ожидания и сформированный ответ вернет пользователю.
Исполнение
Объединим все модули реализуемой схемы в одном проекте, подразумевая при этом, что у каждой составляющей может быть свое хранилище, виртуальное пространство, способы и команды запуска и т.д. Сервисы будут простые простейшие, но достаточные для представления и понимания общей концепции. Полный исходный код в моем репозитории.
Создадим корневой каталог и перейдем в него:
mkdir api-gateway && cd api-gateway
Сформируем файл с основными используемыми библиотеками и назовем его традиционно requirements.txt, остальные подтянутся зависимостями:
echo "
aiohttp==3.8.3
aio-pika==8.2.5
fastapi==0.88.0
gunicorn==20.1.0
path==16.5.0
PyYAML==6.0
pyyaml-tags==1.0.4
six==1.16.0
SQLAlchemy==1.4.45
uvicorn==0.20.0
" > requirements.txt
Создадим виртуальное окружение в каталоге.venv3.10 и активируем его. Кстати, я использую Python 3.10.6 под Ubuntu 20.04:
python3 -m venv .venv3.10 && source .venv3.10/bin/activate
Здесь и далее все скрипты будем запускать под этой средой.
Также установим RabbitMQ. Я предпочитаю разворачивать его в Docker‑контейнере. Хорошая подробная инструкция по установке здесь.
Начнем с... Конца.:‑)
1. Сервис
Создадим каталог первого микросервиса и перейдем в него:
mkdir srv_clients && cd srv_clients
Создадим структуру файлов сервиса так же одной командой:
touch __init__.py main.py schemas.py database.py models.py querysets.py
__init__.py
— служебный файл пакета python будет пуст
database.py
— инициализирует подключение к БД
Код
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker
engine = create_async_engine("sqlite+aiosqlite:///clients.sqlite")
sm = sessionmaker(
engine, autocommit=False, autoflush=False, class_=AsyncSession
)
Base = declarative_base()
Для работы используем асинхронные сессии и SQLite в качестве СУБД.
models.py
— реализует модели структур данных
Код
from sqlalchemy import Column, Integer, Text
from sqlalchemy.orm.collections import InstrumentedList
from srv_clients.database import Base
class BaseModel(Base):
__abstract__ = True
id = Column(Integer, primary_key=True, comment="Идентификатор")
def to_dict(self) -> dict:
data = dict()
for k, v in self.__dict__.items():
if k.startswith("_"):
continue
elif isinstance(v, Base):
v = v.to_dict()
elif isinstance(v, InstrumentedList):
v = [item.to_dict() for item in v]
data[k] = v
return data
Здесь мы создали базовую абстрактную модель. В ней объявлено поле уникального идентификатора, являющееся также первичным ключом. Кроме того, реализовали простейший рекурсивный метод to_dict
для преобразования экземпляра класса в словарь. Да, он не охватывает все ситуации и может вызывать кучу ошибок и требует более детальной проработки, но для примера его более чем достаточно.
class Client(BaseModel):
__tablename__ = "client"
__table_args = {"comment": "Таблица клиентов"}
surname = Column(Text(length=255), nullable=False, comment="Фамилия")
name = Column(Text(length=255), nullable=False, comment="Имя")
country = Column(Text(length=255), comment="Страна")
Создадим унаследованную от BaseModel
модель Клиента, добавим к ней комментарий и несколько текстовых полей длиной до 255 символов так же с пояснениями.
querysets.py
— содержит наборы функций для работы с данными
Код
from typing import List
from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from srv_clients.models import Client
class ClientQueryset:
model = Client
@classmethod
async def create(cls, session: AsyncSession, **kwargs) -> int:
created = cls.model(**kwargs)
session.add(created)
await session.flush([created])
return created.id
Метод создания записи будет возвращать идентификатор созданной записи. Так как идентификатор является автоинкрементом и в простейших условиях генерируется только после закрытия транзакции, принудительно вызовем метод сессии flush
, который сбросит данные в БД и атрибуту id
присвоится значение.
@classmethod
async def get_by_id(cls, session: AsyncSession, id_: int) -> Client:
return await session.scalar(
select(cls.model).where(cls.model.id == id_)
)
@classmethod
async def get_multiple(
cls,
session: AsyncSession,
limit: int = None,
offset: int = None,
**filters,
) -> List[Client]:
where = list()
if surname := filters.get("surname"):
where.append(cls.model.surname == surname)
if name := filters.get("name"):
where.append(cls.model.name == name)
if country := filters.get("country"):
where.append(cls.model.country == country)
return await session.scalars(
select(cls.model).where(*where).limit(limit).offset(offset)
)
Для получения клиента по идентификатору и отфильтрованного списка записей используем методы сессии scalar
и scalars
соответственно. Условия фильтрации ожидаем в виде динамических именованных параметров. Используем простое сравнение полей и добавим возможность реализации пагинации с помощью параметров limit
и offset
.
@classmethod
async def update(cls, session: AsyncSession, id_: int, **kwargs) -> None:
await session.execute(
update(cls.model).where(cls.model.id == id_).values(**kwargs)
)
@classmethod
async def delete(cls, session: AsyncSession, id_: int) -> None:
await session.execute(delete(cls.model).where(cls.model.id == id_))
Все методы обернуты в декоратор @classmethod
для возможности вызова без создания экземпляра класса.
schemas.py
— описывает входные/выходные схемы данных.
Код
from pydantic import BaseModel, Field
class ClientSchema(BaseModel):
surname: str = Field(..., description="Фамилия")
name: str = Field(..., description="Имя")
country: str = Field(..., description="Страна")
class ClientViewSchema(ClientSchema):
id: int = Field(..., description="Идентификатор")
Все атрибуты сделаем обязательными (в качестве значения по умолчанию передадим тип ellipsis
— ...
). К модели просмотра добавим атрибут идентификатора.
main.py
— содержит исполняемый код микросервиса.
Код
from typing import List
from fastapi import FastAPI, HTTPException, Request
from fastapi.params import Query
from starlette import status
from srv_clients.database import Base, engine
from srv_clients.database import sm as session_maker
from srv_clients.querysets import ClientQueryset
from srv_clients.schemas import ClientSchema, ClientViewSchema
app = FastAPI()
app.state.engine = engine
app.state.session_maker = session_maker
Создаем приложение, добавляем к нему в качестве атрибутов подключение к базе данных и инициализатор сессий. Делаем это для того, чтобы иметь возможность обращаться к ним внутри эндпоинтов через экземплярRequest
.
@app.on_event("startup")
async def on_startup():
async with app.state.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@app.on_event("shutdown")
async def on_shutdown():
# async with app.state.engine.begin() as conn:
# await conn.run_sync(Base.metadata.drop_all)
await app.state.engine.dispose()
Добавим два обработчика событий приложения:
startup
— переводим модели SQLAlchemy в таблицы БД;shutdown
— очищаем БД путем удаления всех таблиц (закомментировано) и корректно завершаем соединение с БД.
@app.get("/clients", response_model=List[ClientViewSchema])
async def get_filtered_clients(
request: Request,
surname: str = Query(None, description="Фамилия"),
name: str = Query(None, description="Имя"),
country: str = Query(None, description="Страна"),
limit: int = Query(
None, description="Количество записей на странице результатов"
),
offset: int = Query(
None,
description="Смещение страницы результатов относительно первой записи",
),
):
async with request.app.state.session_maker() as session:
rows = await ClientQueryset.get_multiple(
session,
surname=surname,
name=name,
country=country,
limit=limit,
offset=offset,
)
return [row.to_dict() for row in rows.all()]
@app.post(
"/clients",
response_model=ClientViewSchema,
status_code=status.HTTP_201_CREATED,
)
async def create_client(request: Request, data: ClientSchema):
sm = request.app.state.session_maker
async with sm.begin() as session:
id_ = await ClientQueryset.create(session, **data.dict())
async with sm() as session:
client = await ClientQueryset.get_by_id(session, id_)
return client.to_dict()
@app.get("/clients/{client_id}", response_model=ClientViewSchema)
async def get_client(request: Request, client_id: int):
async with request.app.state.session_maker() as session:
client = await ClientQueryset.get_by_id(session, client_id)
if not client:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Client not found",
)
return client.to_dict()
@app.patch(
"/clients/{client_id}",
response_model=ClientViewSchema,
status_code=status.HTTP_202_ACCEPTED,
)
async def update_client(request: Request, client_id: int, data: ClientSchema):
sm = request.app.state.session_maker
async with sm.begin() as session:
client = await ClientQueryset.get_by_id(session, client_id)
if not client:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Client not found"
)
await ClientQueryset.update(session, client_id, **data.dict())
async with sm() as session:
client = await ClientQueryset.get_by_id(session, client_id)
return client.to_dict()
@app.delete(
"/clients/{client_id}",
status_code=status.HTTP_202_ACCEPTED,
)
async def delete_client(request: Request, client_id: int):
sm = request.app.state.session_maker
async with sm.begin() as session:
client = await ClientQueryset.get_by_id(session, client_id)
if not client:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
await ClientQueryset.delete(session, client_id)
Реализуем здесь простой CRUD с транзакциями. Для операций, вносящих изменения в БД, откроем транзакцию (создадим точку сохранения — savepoint) с помощью метода begin
и менеджера контекста, при выходе из которого она будет успешно закрыта (под капотом будет вызван оператор commit
). В случае возникновения каких‑либо ошибок внутри блока with
транзакция завершится откатом (будет вызван оператор rollback
).
Для разнообразия добавим фильтрацию —get_filtered_clients
.
Для случаев, когда запись из БД обязательно должна быть получена (например, при изменении или удалении), выбросим исключение HTTPException
когда она не найдена.
Для преобразования ответа в экземпляр схемы, передаваемой в параметре response_model
эндпоинта, или их список, реализующая его функция должна возвращать соответственно словарь или список словарей. Тут нам и пригодится метод to_dict
базовой модели данных.
Можно запускать! Напишем команду:
python -m gunicorn --bind 0.0.0.0:8001 --workers 4 -k uvicorn.workers.UvicornWorker main:app
Здесь мы указываем, что сервис будет работать по адресу 0.0.0.0
и на 8001
порте, будет запущено 4 воркера класса UvicornWorker
(необязательно — документация в первоисточнике). Приложение инициализируется в переменную app
модуля main
(main:app
).
Имеем примерно такой вывод:
[2023-01-11 21:02:00 +0300] [29518] [INFO] Starting gunicorn 20.1.0
[2023-01-11 21:02:00 +0300] [29518] [INFO] Listening at: http://0.0.0.0:8001 (29518)
[2023-01-11 21:02:00 +0300] [29518] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2023-01-11 21:02:00 +0300] [29520] [INFO] Booting worker with pid: 29520
[2023-01-11 21:02:00 +0300] [29521] [INFO] Booting worker with pid: 29521
[2023-01-11 21:02:00 +0300] [29522] [INFO] Booting worker with pid: 29522
[2023-01-11 21:02:00 +0300] [29523] [INFO] Booting worker with pid: 29523
[2023-01-11 21:02:01 +0300] [29521] [INFO] Started server process [29521]
[2023-01-11 21:02:01 +0300] [29521] [INFO] Waiting for application startup.
[2023-01-11 21:02:01 +0300] [29523] [INFO] Started server process [29523]
[2023-01-11 21:02:01 +0300] [29523] [INFO] Waiting for application startup.
[2023-01-11 21:02:01 +0300] [29520] [INFO] Started server process [29520]
[2023-01-11 21:02:01 +0300] [29520] [INFO] Waiting for application startup.
[2023-01-11 21:02:01 +0300] [29522] [INFO] Started server process [29522]
[2023-01-11 21:02:01 +0300] [29522] [INFO] Waiting for application startup.
[2023-01-11 21:02:01 +0300] [29521] [INFO] Application startup complete.
[2023-01-11 21:02:01 +0300] [29522] [INFO] Application startup complete.
[2023-01-11 21:02:01 +0300] [29523] [INFO] Application startup complete.
[2023-01-11 21:02:01 +0300] [29520] [INFO] Application startup complete.
2. RPC-сервер
Реализуем функционал, который будет получать запросы из очереди, перенаправлять их в соответствующий сервис и возвращать обратно в эту же очередь. На этом этапе осветим два дополнительных вопроса: работу с переменными окружения, в которых будем хранить пароль для подключения к очереди, и собственно работу с очередью RabbitMQ. Кстати, здесь есть хорошая статья о его основных понятия.
Начнем с создания структуры пакета. Вернемся в корень проекта, создадим каталог rpc
сервера и перейдем в него:
cd .. && mkdir rpc && cd rpc
Создадим структуру файлов так же одной командой:
touch __init__.py common.py resolver.py server.py
Вложим сюда каталог с настройками и .env
файлом:
mkdir config && touch config/.env config/local.yaml
Далее так же по порядку.
__init__.py
— служебный файл пакета python будет пуст.
common.py
— общие функции работы сервера.
Код
Здесь отступление от первичной цели — хранение и получение настроек в yaml‑файле, а также обращение к переменным окружения из конфигурационного файла. Для этого используем библиотеку PyYaml‑Tags. Пока просто объявим класс и переопределим приватный метод _from_yaml
. Вызов будет чуть ниже.
import os
from pathlib import Path
import yaml
from dotenv import load_dotenv
from yaml_tags import BaseTag, tag_registry
@tag_registry.register("env_tag")
class EnvTag(BaseTag):
def _from_yaml(
self,
_loader,
_work_dir,
_prefix,
_suffix,
param=None,
*args,
**kwargs,
):
result = os.environ.get(param, "")
return f"{_prefix}{result}{_suffix}"
Регистрируем свой тэг env_tag
и метод его и формирования.
def load_config(config_path: Path) -> dict:
tag_registry.require("env_tag")
env_path = f"{config_path.absolute().parent}/.env"
load_dotenv(dotenv_path=env_path)
with open(config_path) as f:
return yaml.load(f, Loader=yaml.Loader)
def get_config_path(file_name: str) -> Path:
current_dir = Path(__file__).absolute().parent
return current_dir / "config" / file_name
Тут мы считаем .env файл. Будем ожидать его в каталоге с конфигурационным(‑ми) файлом(‑ами).
local.yaml:
rmq:
url: amqp://gateway:<% env_tag(param="RMQ_PASS") %>@127.0.0.1:5672/gateway
В данном случае храним только строку подключения к RabbitMQ.
.env:
RMQ_PASS=gateway
Обращение к зарегистрированному тэгу простое: <% [имя_тэга]([имя_именованного_параметра]=[имя_переменной_окружения]) %>
resolver.py
— хранит функции распределения запросов между сервисам.
Код
from typing import Tuple
import aiohttp
SERVICE_MAP = {
"clients": "0.0.0.0:8001",
"goods": "0.0.0.0:8002",
}
async def resolve(
method: str,
path: str,
params: dict = None,
data: dict = None,
headers: dict = None,
) -> Tuple[dict, int]:
"""
Функция определяет по первой части url-запроса к какому сервису
происходит обращение, перенаправляет в него запрос и возвращает
ответ в виде словаря и код статуса.
"""
_, service_name, *_ = path.split("/")
service_host = SERVICE_MAP[service_name]
url = f"http://{service_host}{path}"
response, status_code = await make_request(
url, method, params, data, headers
)
return response, status_code
async def make_request(
url: str,
method: str,
params: dict = None,
data: dict = None,
headers: dict = None,
) -> Tuple[dict, int]:
"""
Выполнение запроса, тип которого определяется параметром method,
по url-адресу. Возвращает ответ в виде словаря и код статуса.
"""
if not headers:
headers = {}
if not data:
data = {}
async with aiohttp.ClientSession() as session:
request = getattr(session, method)
async with await request(
url, params=params, json=data, headers=headers
) as response:
response_json = await response.json()
return response_json, response.status
server.py
— исполняемый код RPC‑сервера.
Код
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
from common import get_config_path, load_config
from resolver import resolve
async def server() -> None:
config = load_config(get_config_path("local.yaml"))
rmq = config["rmq"]
connection = await connect_robust(rmq["url"])
async with connection:
channel = await connection.channel()
rpc = await RPC.create(channel)
await rpc.register(resolve.__name__, resolve, auto_delete=True)
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(server())
Здесь все совсем просто. Считываем конфигурационный файл, преобразовываем его в словарь и получаем настройки подключения к очереди RabbitMQ. Создаем это подключение, канал и регистрируем обработчик. Кстати, в пакете aio_pika
есть и другие шаблоны, кроме RPC.
Тут также все готово. Можно запускать:
python server.py
Сервер можно распараллелить, просто выполнив команду запуска необходимое количество раз в разных терминалах. Запросы будут автоматически последовательно распределяться между каналами очередей.
Заглянув в графический интерфейс RabbtiMQ, увидим что создается один обменник rpc.dlx
:
И две очереди:
Схема работы будет следующая:
Когда клиент запускается, он создает анонимную эксклюзивную очередь обратного вызова (в нашем случае
amq_0x4e0bc6f2bc4e3df862a47b2b772bec3d
);Для запроса RPC клиент отправляет сообщение с двумя свойствами:
answer_to
, для которого задана очередь обратного вызова, иcorellation_id
, для которого установлено уникальное значение для каждого запроса;Запрос отправляется в очередь
resolve
;RPC‑сервер ожидает запросов в этой очереди. Когда появляется запрос, он выполняет работу и отправляет сообщение с результатом обратно Клиенту, используя очередь из поля
reply_to
;Клиент ожидает данных в очереди обратного вызова. Когда появляется сообщение, оно проверяет свойство
corellation_id
. Если он соответствует значению из запроса, он возвращает ответ приложению.
3. Gateway
Наш шлюз должен повторять структуру конечных точек всех обслуживаемых сервисов, а также схем входных и выходных данных. Поэтому выделим для каждого сервиса отдельный каталог.
Начнем с создания структуры. Вернемся в корень проекта, создадим каталог gateway
и перейдем в него:
cd .. && mkdir gateway && cd gateway
Создадим структуру файлов и каталогов так же одной командой:
touch __init__.py rabbit.py common.py main.py && mkdir config && touch config/.env config/local.yaml && mkdir srv_clients && touch srv_clients/__init__.py srv_clients/routes.py srv_clients/schemas.py
Далее снова по порядку.
__init__.py
— служебный файл пакета python традиционно пуст.
rabbit.py
— функции работы с очередями RabbitMQ.
Код
from typing import Tuple
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def send_request_to_queue(
config: dict, message: dict
) -> Tuple[dict, int]:
rmq = config["rmq"]
connection = await connect_robust(rmq["url"])
async with connection:
channel = await connection.channel()
rpc = await RPC.create(channel)
result, status_code = await rpc.proxy.resolve(**message)
return result, status_code
Из настроек получаем конфигурацию для работы с очередью и открываем соединение. Обозначаем, что хотим вызвать функцию resolve
. Она, как известно, возвращает словарь и числовой код статуса.
common.py
— общие функции работы сервера.
Код
По большей части он будет повторять код rpc‑сервера с одним дополнением — функцией router
.
import os
from functools import wraps
from pathlib import Path
from typing import Any, Optional
import yaml
from dotenv import load_dotenv
from fastapi import HTTPException, Request, Response
from starlette import status
from yaml_tags import BaseTag, tag_registry
from gateway.rabbit import send_request_to_queue
@tag_registry.register("env_tag")
class EnvTag(BaseTag):
def _from_yaml(
self,
_loader,
_work_dir,
_prefix,
_suffix,
param=None,
*args,
**kwargs,
) -> str:
result = os.environ.get(param, "")
return f"{_prefix}{result}{_suffix}"
def load_config(config_path: Path) -> dict:
tag_registry.require("env_tag")
env_path = f"{config_path.absolute().parent}/.env"
load_dotenv(dotenv_path=env_path)
with open(config_path) as f:
return yaml.load(f, Loader=yaml.Loader)
def get_config_path(file_name: str) -> Path:
current_dir = Path(__file__).absolute().parent
return current_dir / "config" / file_name
def router(
method,
path: str,
data_key: Optional[str] = None,
status_code: Optional[int] = status.HTTP_200_OK,
response_model: Optional[Any] = None,
):
"""
Обертка функции эндпоинта, реализующая обращение к RPC-серверу.
:param method: Вызываемый объект (функция) реализующая тот или
иной метод http-запроса.
:param path: Адрес эндпоинта.
:param data_key: Имя ключа, в котором передаются данные
на эндпоинт.
:param status_code: Ожидаемый код ответа сервера.
:param response_model: Модель данных для преобразования ответа.
"""
app_method = method(
path, status_code=status_code, response_model=response_model
)
def wrapper(endpoint):
@app_method
@wraps(endpoint)
async def decorator(request: Request, response: Response, **kwargs):
request_method = request.scope["method"].lower()
data = kwargs.get(data_key)
data = data.dict() if data else {}
response_data, response_status_code = await send_request_to_queue(
config=request.app.config,
message={
"method": request_method,
"path": request.scope["path"],
"params": dict(request.query_params),
"data": data,
"headers": {},
},
)
if response_status_code >= status.HTTP_400_BAD_REQUEST:
raise HTTPException(
status_code=response_status_code, detail=response_data
)
response.status_code = response_status_code
return response_data
return wrapper
Функция router
реализует декоратор, который в свою очередь традиционным методом оборачивает эндпоинт в параметризированный декоратор. Из переданных параметров собирается объект сообщения для направления в один из сервисов. Сообщение направляется в очередь и ожидается ответ. Если код ответа информирует об ошибке (>= 400), выбрасываем исключение с полученным статусом и сообщением. В случае успеха, подменяем статус объекта response
кодом, полученным от сервиса.
На данном этапе функционал можно дополнить, например, проверкой учетных данных, каким‑то логированием и всяческими плюшками. Для примера ограничимся простыми запросом‑ответом.
main.py
— исполняемый код шлюза.
Код
from fastapi import FastAPI
from common import get_config_path, load_config
from gateway.srv_clients.routes import clients_router
app = FastAPI()
app.config = load_config(get_config_path("local.yaml"))
app.include_router(clients_router)
Здесь мы инициализируем приложение, добавляем к нему в качестве атрибута config
словарь с настройками. Делаем это опять же для того, чтобы иметь возможность обращаться к ним внутри эндпоинтов через экземплярRequest
, а именно в декораторе route
.
Как я говорил выше, описываем сервисы, частично повторяя их структуру.
routes.py
— конечные точки входа в сервис (endpoints).
Код
from typing import List
from fastapi import APIRouter, Response
from fastapi.params import Query
from starlette import status
from starlette.requests import Request
from gateway.common import router
from gateway.srv_clients.schemas import ClientSchema, ClientViewSchema
clients_router = APIRouter(prefix="/clients")
@router(
method=clients_router.get,
path="",
response_model=List[ClientViewSchema],
)
async def get_filtered_clients(
request: Request,
response: Response,
surname: str = Query(None, description="Фамилия"),
name: str = Query(None, description="Имя"),
country: str = Query(None, description="Страна"),
limit: int = Query(
None, description="Количество записей на странице результатов"
),
offset: int = Query(
None,
description="Смещение страницы результатов относительно первой записи",
),
):
pass
@router(
method=clients_router.post,
path="",
response_model=ClientViewSchema,
status_code=status.HTTP_201_CREATED,
data_key="data",
)
async def create_client(
request: Request, response: Response, data: ClientSchema
):
pass
@router(
method=clients_router.get,
path="/{client_id}",
response_model=ClientViewSchema,
)
async def get_client(request: Request, response: Response, client_id: int):
pass
@router(
method=clients_router.patch,
path="/{client_id}",
response_model=ClientViewSchema,
status_code=status.HTTP_202_ACCEPTED,
data_key="data",
)
async def update_client(
request: Request, response: Response, client_id: int, data: ClientSchema
):
pass
@router(
method=clients_router.delete,
path="/{client_id}",
status_code=status.HTTP_202_ACCEPTED,
)
async def delete_client(request: Request, response: Response, client_id: int):
pass
Все эндпоинты не содержат код. Запросы выполняются внутри декоратора, в который мы оборачиваем функцию.
schemas.py
— Схемы входных и выходных данных.
Код
from pydantic import BaseModel, Field
class ClientSchema(BaseModel):
surname: str = Field(..., description="Фамилия")
name: str = Field(..., description="Имя")
country: str = Field(..., description="Страна")
class ClientViewSchema(ClientSchema):
id: int = Field(..., description="Идентификатор")
Здесь полная копия схем сервиса.
Готово! Запустим шлюз на 8000-м порте:
python -m gunicorn --bind 0.0.0.0:8000 --workers 4 -k uvicorn.workers.UvicornWorker main:app
Пробы
Вся связка готова. Можно запускать все вместе и применять функционал на практике.
Создадим клиента:
POST http://localhost:8000/clients
Content-Type: application/json
{
"surname": "John",
"name": "Malkovitch",
"country": "USA"
}
И получим ответ:
HTTP/1.1 201 Created
date: Mon, 06 Feb 2023 10:43:10 GMT
server: uvicorn
content-length: 61
content-type: application/json
{
"surname": "John",
"name": "Malkovitch",
"country": "USA",
"id": 1
}
Запросим список клиентов:
GET http://localhost:8000/clients
Content-Type: application/json
Результат будет таким:
HTTP/1.1 200 OK
date: Mon, 06 Feb 2023 10:45:12 GMT
server: uvicorn
content-length: 63
content-type: application/json
[
{
"surname": "John",
"name": "Malkovitch",
"country": "USA",
"id": 1
}
]
Ожидаемо получаем список json‑объектов.
Так же запросим запись по идентификатору:
GET http://localhost:8000/clients/1
Content-Type: application/json
Результат будет таким:
HTTP/1.1 200 OK
date: Mon, 06 Feb 2023 10:48:26 GMT
server: uvicorn
content-length: 61
content-type: application/json
{
"surname": "John",
"name": "Malkovitch",
"country": "USA",
"id": 1
}
Изменим нашего Джона Малковича из США на Джона Смита из Британии:
PATCH http://localhost:8000/clients/1
Content-Type: application/json
{
"surname": "John",
"name": "Smith",
"country": "Great Britain"
}
В ответ получаем измененную запись:
HTTP/1.1 202 Accepted
date: Mon, 06 Feb 2023 10:52:02 GMT
server: uvicorn
content-length: 66
content-type: application/json
{
"surname": "John",
"name": "Smith",
"country": "Great Britain",
"id": 1
}
А теперь удалим его:
DELETE http://localhost:8000/clients/1
Content-Type: application/json
В ответе теперь нас интересует лишь статус и он 202-й:
HTTP/1.1 202 Accepted
date: Mon, 06 Feb 2023 10:52:37 GMT
server: uvicorn
content-length: 4
content-type: application/json
null
Вот и всё. Проделано много работы и написано много кода. RPC достаточно старая, но хорошо себя зарекомендовавшая технология. Данный вариант взаимодействия пользователя с сервисами не ультимативен и может быть реализован с использованием иных технологий. Спасибо, что дочитали до конца. До скорых встреч!
SergeyDeryabin
А зачем на самой первой картинке нужен RabbitMQ? Почему нельзя сразу на бэк запросы прикидывать?
v_suntsov Автор
Самая главная причина изложена в заголовке — такая поставлена задача.
SergeyDeryabin
Это я понял, зачем через Rabbit все гнать? В чем плюс использования Rabbit вместо проксирования на бэк без очереди?
affinity
вероятно если инстансов какого-то сервиса несколько, то что бы не заниматься балансировкой (и вообще не знать сколько их там и в какой момент времени) - проще так. А там кто первый взял сообщение - тот и пашет. Но в статье про это ничего и нет :(
SergeyDeryabin
Инстансы чего, RPC? Он на схеме один. Инстансы идут уже за RPC и gunicorn
affinity
Справедливо. На схеме нет вариантов что бы тех же RPC было несколько. Прошу прощения - не внимательность :)
SergeyDeryabin
Если это сделано для эксперимента и развития, то это одно.
А если в рабочем проекте - то я вижу только перегрузку архитектуры и дополнительную точку отказа.
ovalsky
Т.е. это "туториал" для эксперимента?
v_suntsov Автор
Да. Эксперимент. Мой личный. Не на все вопросы я быстро находил ответы в процессе решения в русскоязычном интернете, поэтому решил все собрать воедино в одном месте.
v_suntsov Автор
На схеме - да, действительно один, но в тексте я указал, что их так же можно запустить n-ое количество.
v_suntsov Автор
Вопрос изоляции частей архитектуры. Наружу микросервисная часть не смотрит вообще и может быть максимально изолирована. В случае получения доступа за шлюз, доступа к критической инфраструктуре не будет. Ну и да — балансировки нагрузки.
SergeyDeryabin
Ничего не изменится, если API Gateway будет сам проксировать на RPC
Балансировать нагрузку может API Gateway, у брокера другая задача
OlegSpectr
У вас по сути за всё это и должен отвечать API Gateway. Сейчас же RabbitMQ используется ради того, чтобы был RabbitMQ, каких-то плюсов в такой реализации нет, более того, сильно увеличивается время ответа
andrejbestuzhev
Вы всё-таки решаете какую-то проблему и публике очень хочется понять: какую именно? Вместо этого вы предлагаете туториал, как решать неизвестную проблему.
v_suntsov Автор
Проблема проста - решение поставленной задачи. Мой интерес организовать заявленную архитектуру. В общем-то, статья об этом.
SergeyDeryabin
Давайте тогда разберем Вашу задачу
В задаче не было указано, для каких целей использовать RabbitMQ? Какие будут преимущества от этого? У Вас банковское приложение или биржевое, где необходимо сохранить запросы на случай падения и вернуться после восстановления к их обработке?
v_suntsov Автор
Вообще, это было тестовое задание. В общих чертах рассказано об архитектуре и предложено реализовать свое видение. Это прототип, лежащий лишь в репозитории GitHub-а. Приложение не банковское, но реализуется для достаточно крупной организации.
andrejbestuzhev
У меня бы возникло множество вопросов к такой постановке задачи. Если это какой-то сервис в вакууме - ну ок.
1. Почему приложению искусственно ограничивают возможность использовать доступные ядра на 100%? (Если gunicorn такое умеет, конечно, я не знаю), т.к. rabbitmq будет передавать сообщения значительно медленнее, чем утилизируются ресурсы процессора.
2. Почему именно rabbitmq, а не kafka?
3. Что будет, если сервис за rabbitmq откажет, что получит пользователь и когда?
4. Что будет в случае отказа rabbitmq?
Вообще, если мы используем очереди, мы предполагаем, что ответ на запрос поступит "когда-нибудь", а в вашей схеме пользователь отправляет, по всей видимости, синхронный запрос. Это повлечёт очень большие задержки, если кто-то в беке затупит. Следовательно, на фронте тоже должен будет стоять какой-то листенер, который будет обрабатывать "ответы" из очереди - ещё одна точка отказа
Я понял, что не вы такую схему придумали, но всё же было бы здорово выяснить у того, кто такую задачу поставил: зачем вообще порождать такого монстра?
bondeg
Чтобы говорить "у нас крутая распределенная архитектура и микросервисы", вестимо. Это как в отзывах одного приложения хвастались "увеличили скорость отдачи бонусного баланса с 30 до 15 секунд". Нет, это не опечатка, именно секунд.