В этой статье хочу рассказать о том, как написать полезный сервис, для получения ИНН по персональным данным (паспортные данные). ИНН физического лица получаем с использование сайта https://service.nalog.ru/. Похожая функциональность, скорее всего, уже где-то и кем-то была реализована. Основная идея статьи - поделиться опытом работы с Python в части создания законченного проекта с использованием контейнера зависимостей, создания слушателей для RabbitMQ и работой с базой данных MongoDB. Работа с клиентами сервиса реализована через RabbitMQ в режиме непрерывного чтения очереди, отправкой результата в выходную очередь. Сервис будет жить в Kubernetes, что требует наличие liveness и readiness проб. Для этого используется веб-сервер.
Общие сведения
Сервис будем реализовывать на Python 3.10 с использованием библиотек aio-pika, fastapi, pydantic, motor и других библиотек, которые будут указаны в pyproject.toml проекта. В качестве базы данных используем MongoDB 4+. Обращение к сервису налоговой выполняется при помощи библиотеки aiohttp. Проект размещён в публичном доступе на GitHub.
Приложение функционирует как слушатель входной очереди и веб-сервер для отдачи liveness и readiness-проб. При получении сообщения в очередь, из заголовка reply-to вычитывается имя выходной очереди, в которую будет направлен ответ. Обработка запроса передаётся в сервис, который проверяет наличие похожего запроса в базе данных. В случае отсутствия данных по клиенту, выполняется запрос к внешнему сервису. Внешний сервис может обработать какое-то количество сообщений без запроса капчи. После превышения лимитов, которые доподлинно не известны (но изменяются при общей повышенной нагрузке), сообщение помещается в мёртвую очередь и через указанное в настройках время возвращается в обработку.
Подготовительные работы для базы данных не требуется. При первом подключении к MongoDB будут созданы необходимые коллекции и индексы.
Контракт общения с сервисом
Определим контракт входного сообщения в формате JSON:
Hidden text
{
"requestId": str,
"firstName": str,
"lastName": str,
"middleName": str,
"birthDate": date,
"documentSerial": str,
"documentNumber": str,
"documentDate": date
}
Все поля интуитивно понятны. Атрибут requestId
должен быть уникален в пределах всех сообщений, имеет смысл передавать его как строковое представление GUID
.
Имя выходной очереди может передаваться через поле reply-to заголовка сообщения.
Контракт выходного сообщения будет следующим:
Hidden text
{
"requestId": str,
"inn": str,
"cached": bool,
"details": str,
"elapsedTime": float
}
В ответе будем отдавать код запроса, собственно ИНН и время, за которое отработал сервис запрос и признак кэшированного ответа.
Структура проекта
Общая структура директорий проекта следующая.
src
|--inn_service
|--clients
|--connection_managers
|--core
|--infrastructure
|--controllers
|--handlers
|--http_server
|--queue_manager
|--models
|--repositories
|--serializers
|--services
main.py
.env.example
.gitignore
docker-compose.yaml
pyproject.yaml
В корневой директории будут размещаться инструменты запуска проекта: docker-compose, make-файл запуска линтинга и тестов. Собственно проект размещён в src/inn_service и содержит:
clients - клиенты для подключения к действительным поставщикам данных (nalog.ru и прочие);
connection_managers - инфраструктурные подключения к базе данных, очередям;
core - общий код приложения (собственно приложение, контейнер);
infrastructure - менеджер обработчиков очередей, сами обработчики, инфраструктурные контроллеры;
models - моделей приложения, DTO-объекты;
repositories - репозиторий для работы с базой данных;
serializers - сериализаторы входных запросов, данных для отправки в провайдер ИНН;
services - сервисы приложения.
Работу по созданию виртуального подключения переложим на PyCharm и poetry. Краткая команда установки: poetry install
.
Настройки приложения
Начнём разработку с создания настроек приложения, используя BaseSettings из пакета pydantic.
В файле settings.py будут находиться настройки.
Hidden text
class Settings(BaseSettings):
app_name: str = 'INN service'
app_request_retry_times: int # Количество попыток обработки внешнего запроса
app_request_retry_sec: int # Время задержки в секундах перед повторной обработкой запроса
http_host: str
http_port: int
http_handler: str = 'asyncio'
mongo_host: str
mongo_port: str
mongo_user: str
mongo_pass: str
mongo_name: str
mongo_rs: Optional[str] = None
mongo_auth: str
mongo_timeout_server_select: int = 5000
rabbitmq_host: str
rabbitmq_port: int
rabbitmq_user: str
rabbitmq_pass: str
rabbitmq_vhost: str
rabbitmq_exchange_type: str
rabbitmq_prefetch_count: int
rabbitmq_source_queue_name: str
client_nalog_url: str # Адрес внешнего сервиса для получения ИНН
client_nalog_timeout_sec: int # Таймаут ожидания ответа от сервиса
client_nalog_retries: int # Количество попыток запросов к внешнему сервису
client_nalog_wait_sec: int # Время ожидания между попытками client_nalog_retries
@property
def mongo_dsn(self) -> str:
mongo_dsn = 'mongodb://{}:{}@{}:{}/{}'.format(
self.mongo_user,
self.mongo_pass,
self.mongo_host,
self.mongo_port,
self.mongo_auth
)
if self.mongo_rs:
mongo_dsn += f'?replicaSet={self.mongo_rs}'
return mongo_dsn
@property
def rabbitmq_dsn(self) -> str:
return 'amqp://{}:{}@{}:{}/{}'.format(
self.rabbitmq_user,
self.rabbitmq_pass,
self.rabbitmq_host,
self.rabbitmq_port,
self.rabbitmq_vhost
)
Предлагаю не указывать значения по умолчанию для настроек. Если что-то пойдёт не так, то сразу увидим проблему. В этот момент можно подготовить сразу и файл .env.example, содержащий настройки по-умолчанию для сервиса.
Подключения к инфраструктуре
Создадим слой подключения к инфраструктуре rabbitmq, mongodb через компоненты aio-pika и motor:
poetry add motor aio-pika fast fastapi uvicorn injector
Слой подключения будет размещаться в connection_managers и предназначен для организация подключения к базе данных и менеджеру очередей. Добавим две миксины для создания механизма регистрации автозапуска и завершения приложения. Механизм автозапуска функций применяется при старте приложения для инициализации подключения к RabbitMQ и MongoDB, а также для создания индексов в коллекции базы данных. В случае возникновения ошибок при подключении, приложение не стартует и выдаётся ошибка в логи.
Hidden text
class StartupEventMixin(ABC):
@abstractmethod
def startup(self) -> Coroutine:
raise NotImplementedError
class ShutdownEventMixin(ABC):
@abstractmethod
def shutdown(self) -> Coroutine:
raise NotImplementedError
На примере RabbitConnectionManager продемонстрируем реализацию.
Hidden text
class RabbitConnectionManager(StartupEventMixin, ShutdownEventMixin, EventLiveProbeMixin):
def startup(self) -> Coroutine:
return self.create_connection()
def shutdown(self) -> Coroutine:
return self.close_connection()
async def create_connection(self) -> None:
self.logger.info('Create connection RabbitMQ')
try:
self._connection = await connect_robust(self._dsn)
self._connection.reconnect_callbacks.add(self.on_connection_restore)
self._connection.close_callbacks.add(self.on_close_connection)
self.connected = True
except ConnectionError as exc:
err_message = f'Rabbit connection problem: {exc}'
self.logger.error(err_message)
raise ConnectionError(err_message)
async def close_connection(self) -> None:
if self._connection:
await self._connection.close()
# ... некоторый код пропущен, полная версия на гитхабе
def on_close_connection(self, *args):
self.logger.error('Lost connection to RabbitMQ...')
self.connected = False
def on_connection_restore(self, *args):
self.logger.info('Connection to RabbitMQ has been restored...')
self._channel = None
self._exchange = None
self.connected = True
При подключении к RabbitMQ устанавливаются функции коллбэков для реагирования на потерю соединения и его восстановление.
Менеджер обработчиков
Менеджер обработчиков предназначен для управления слушателями (consumers) очередей. В проекте используется концепция "мёртвых очередей", которая позволяет отложить сообщение на некоторое время и вернуться к его обработке позже. Причиной для этого может являться долгий ответ от провайдера, временные ошибки провайдера, требование ввода капчи из-за нагрузки. Достаточно подробно механизм мёртвых очередей технически разобран в статье Отложенные ретраи силами RabbitMQ. Каждый обработчик очереди должен хранить и возвращать признак использования ретраев, время между возвратами в основную очередь на обработку, а также имя очереди, которую планирует слушать. Основной код обработчика находится в run_handler
. От функции ожидается True
при успешном обработке либо непоправимой ошибке запроса (некорректное тело сообщения) и False
, если запрос не удалось обработать, но следует повторить позднее.
Код базового обработчика:
Hidden text
class BaseHandler(ABC):
def __init__(
self,
settings: Settings,
logger: AppLogger,
rabbitmq_connection: RabbitConnectionManager
) -> None:
self.settings = settings
self.logger = logger
self.rabbitmq_connection = rabbitmq_connection
@abstractmethod
def get_use_retry(self) -> bool:
raise NotImplementedError
def get_retry_ttl(self) -> int:
return 0
@abstractmethod
def get_source_queue(self) -> str:
raise NotImplementedError
def convert_seconds_to_mseconds(self, value: int) -> int:
return value * 1000
@abstractmethod
async def run_handler(
self,
message: dict,
request_id: Optional[str],
result_queue: Optional[str],
count_retry: Optional[int] = 0
) -> bool:
raise NotImplementedError
Собственно единственный наследник класса RequestHandler
, реализующий приём и обработку сообщения:
Hidden text
class RequestHandler(BaseHandler):
def __init__(
self,
settings: Settings,
logger: AppLogger,
rabbitmq_connection: RabbitConnectionManager,
service: InnService
) -> None:
super().__init__(settings, logger, rabbitmq_connection)
self.source_queue_name = self.settings.rabbitmq_source_queue_name
self.retry_times = self.settings.app_request_retry_times
self.retry_sec = self.settings.app_request_retry_sec
self.service = service
def get_source_queue(self) -> str:
return self.source_queue_name
def get_use_retry(self) -> bool:
return True
def get_retry_ttl(self) -> int:
return self.retry_sec
async def run_handler(
self,
message: dict,
request_id: Optional[str],
result_queue: Optional[str],
count_retry: Optional[int] = 0
) -> bool:
if count_retry > self.retry_times:
self.logger.warning(f'Request {request_id} was rejected by excess attempts {self.retry_times} times')
return True
self.logger.info(f'Get request {request_id} for response {result_queue}')
client_data = RequestSerializer.parse_obj(message)
response = await self.service.get_client_inn(client_data)
if result_queue:
json_message = response.dict()
await self.rabbitmq_connection.send_data_by_queue(json_message, result_queue)
return True
При получении сообщения проверяем количество повторного попадания в очередь через параметр count_retry
. В случае превышения - отправляем статус обработки сообщения (ошибку) в выходную очередь и приостанавливаем обработку данного сообщения. RequestSerializer.parse_obj(message)
не обёрнут в try...except блок потому как менеджер очередей контролирует ошибки преобразования сообщений ValidationError.
Работа с базой данных
Выбор на MongoDB пал из-за простоты использования, отсутствия миграций, гибкой схемы обработки данных. В задаче нет необходимости в хранении зависимых данных, оформление связей между таблицами. Для работы с данными будем использовать паттерн Репозиторий.
В базовом репозитории расположены функции работы с данными, индексами в нотации Mongo, а в конкретных классах реализуем необходимые сервису функции. Создание индексов выполняется при старте приложения в фоновом режиме (флаг background), для чего используется имплементация миксины StartupEventMixin. Запросы набора данных поддерживают пагинацию и сортировку.
Конкретный класс создаётся на каждую отдельную коллекцию. В проекте один репозиторий для клиентских запросов. Модель для хранения данных находится в директории models и называется ClientDataModel
. Клиентская модель создана с типизацией, поддерживаемой MongoDB (datetime вместо date), для атрибута created_at указана функция генерации значения по умолчанию через default_factory. Также в модель добавлена функция подсчёта времени обработки запроса elapsed_time
и метод класса для создания объекта из клиентского запроса.
Hidden text
class ClientDataModel(BaseModel):
created_at: datetime = Field(default_factory=datetime.utcnow)
request_id: str
first_name: str
last_name: str
middle_name: str
birth_date: datetime
birth_place: str = Field(default='')
passport_num: str
document_date: datetime
executed_at: Optional[datetime]
inn: Optional[str]
error: Optional[str]
@classmethod
def create_from_request(cls, request: RequestMqSerializer) -> 'ClientDataModel':
return ClientDataModel(
request_id=request.request_id,
first_name=request.first_name,
last_name=request.last_name,
middle_name=request.middle_name,
birth_date=datetime.combine(request.birth_date, datetime.min.time()),
passport_num='{} {}'.format(request.document_serial, request.document_number),
document_date=datetime.combine(request.document_date, datetime.min.time()),
)
@property
def elapsed_time(self) -> float:
end = self.executed_at or datetime.utcnow()
return (end - self.created_at).total_seconds()
Код базового репозитория:
Hidden text
class BaseRepository(StartupEventMixin):
def __init__(self, mongodb_connection_manager: MongoConnectionManager, setting: Settings) -> None:
self.mongodb_connection_manager = mongodb_connection_manager
self.db_name = setting.mongo_name
@property
def collection_name(self) -> str:
raise NotImplementedError
@property
def collection_indexes(self) -> Iterable[IndexDef]:
raise NotImplementedError
def startup(self) -> Coroutine:
return self.create_indexes()
async def create_index(self, field_name: str, sort_id: int) -> None:
connection = await self.mongodb_connection_manager.get_connection()
collection = connection[self.db_name][self.collection_name]
await collection.create_index([(field_name, sort_id), ], background=True)
async def create_indexes(self) -> None:
tasks = []
for index_item in self.collection_indexes:
tasks.append(self.create_index(index_item.name, index_item.sort))
asyncio.ensure_future(asyncio.gather(*tasks))
async def get_one_document(self, criteria: dict) -> Optional[dict]:
connection = await self.mongodb_connection_manager.get_connection()
collection = connection[self.db_name][self.collection_name]
return await collection.find_one(criteria)
async def get_list_document(
self,
criteria: dict,
sort_criteria: Optional[list] = None,
limit: Optional[int] = 0,
skip: Optional[int] = 0,
) -> List[dict]:
if not sort_criteria:
sort_criteria = []
connection = await self.mongodb_connection_manager.get_connection()
cursor = connection[self.db_name][self.collection_name].find(
criteria,
limit=limit,
skip=skip,
sort=sort_criteria
)
result = list()
async for data in cursor:
result.append(data)
return result
async def save_document(self, data: dict) -> str:
connection = await self.mongodb_connection_manager.get_connection()
result = await connection[self.db_name][self.collection_name].insert_one(data)
return result.inserted_id
async def update_document(self, criteria: dict, data: dict) -> None:
connection = await self.mongodb_connection_manager.get_connection()
await connection[self.db_name][self.collection_name].update_one(criteria, {'$set': data})
Сервисный слой
Сервисный слой выполняет всю необходимую обработку с данными.
обращение в базу данных для поиска аналогичного запроса (request_id и паспортные данные);
отдать результат, если данные были найдены;
выполнить запрос к API;
сохранить результат запроса в базу данных;
вернуть ответ.
В сервисном слое попытался абстрагироваться от работы с инфраструктурой. Возврат ответа производится в вызывающую функцию, которая должна знать куда вернуть ответ. В данном случае, менеджер очередей "знает" куда ему ответить благодаря наличию поля reply-to в заголовке запроса. Возвращаемое значение оформлено в виде DTO-объекта (RequestDTO
).
Код класса InnService
:
Hidden text
class InnService:
def __init__(
self,
settings: Settings,
logger: AppLogger,
client: NalogApiClient,
storage: RequestRepository
) -> None:
self.settings = settings
self.logger = logger
self.client = client
self.storage_repository = storage
async def get_client_inn_from_storage(self, client_data: RequestSerializer) -> Optional[RequestModel]:
client_passport = f'{client_data.document_serial} {client_data.document_number}'
client_request = await self.storage_repository.find_request(client_passport, client_data.request_id)
return client_request
def update_status(self, model: RequestModel, inn: str, error: str) -> None:
model.inn = inn
model.error = error
async def get_client_inn(self, client_data: RequestSerializer) -> RequestDTO:
"""Получение клиентского ИНН"""
start_process = datetime.utcnow()
model = RequestModel.create_from_request(client_data)
# Получить данные из БД
existing_data = await self.get_client_inn_from_storage(client_data)
if existing_data:
elapsed_time = (datetime.utcnow() - start_process).total_seconds()
return RequestDTO(
request_id=client_data.request_id,
inn=existing_data.inn,
elapsed_time=elapsed_time,
cashed=True
)
# Сделать фактический запрос в Nalog API
request = NalogApiRequestSerializer.create_from_request(client_data)
error, result = None, ''
try:
result = await self.client.send_request_for_inn(request)
except NalogApiClientException as exception:
self.logger.error('Error request to Nalog api service', details=str(exception))
error = str(exception)
self.update_status(model, result, error)
await self.storage_repository.save_request(model)
return RequestDTO(
request_id=model.request_id,
inn=model.inn,
details=model.error,
elapsed_time=model.elapsed_time
)
Второй сервис в приложении - это сервис опроса инфраструктуры для health-check. Инфраструктурные менеджеры, которые необходимо мониторить, должны наследоваться от миксины EventLiveProbeMixin
и реализовать функцию is_connected
.
Клиент
Клиент NalogApiClient предназначен для выполнения POST запроса к https://service.nalog.ru/inn.do и разбора статуса ответа. Функция непосредственного оформления запроса обёрнута в retry декоратор повторителя запроса при возникновении ошибок. Настройки повторителя в общих настройках приложения.
Hidden text
class NalogApiClient:
CLIENT_EXCEPTIONS = (
NalogApiClientException,
aiohttp.ClientProxyConnectionError,
aiohttp.ServerTimeoutError,
)
def __init__(self, settings: Settings, logger: AppLogger):
self.nalog_api_service_url = settings.client_nalog_url
self.request_timeout = settings.client_nalog_timeout_sec
self.retries_times = settings.client_nalog_retries
self.retries_wait = settings.client_nalog_wait_sec
self.logger = logger
self.timeout = aiohttp.ClientTimeout(total=self.request_timeout)
@property
def _headers(self):
return {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "ru-RU,ru",
"Connection": "keep-alive",
"Origin": "https://service.nalog.ru",
"Referer": self.nalog_api_service_url,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"X-Requested-With": "XMLHttpRequest",
}
async def send_request_for_inn(self, nalog_api_request: NalogApiRequestSerializer) -> Optional[str]:
self.logger.debug(f'Request to nalog api service for {nalog_api_request.client_fullname}')
form_data = nalog_api_request.dict(by_alias=True)
@retry(self.CLIENT_EXCEPTIONS, logger=self.logger, attempts=self.retries_times, wait_sec=self.retries_wait)
async def make_request(client_session: aiohttp.ClientSession):
async with client_session.post(url=self.nalog_api_service_url, data=form_data) as response:
if response.status not in [http.HTTPStatus.OK, http.HTTPStatus.NOT_FOUND]:
response_text = await response.text()
raise NalogApiClientException(response_text)
data = await response.json()
code = data.get('code')
captcha_required = data.get('captchaRequired')
if captcha_required:
raise NalogApiClientException(f'Captcha required for request {nalog_api_request.client_fullname}')
if code == 0:
return 'no inn'
elif code == 1:
return data.get('inn')
else:
raise NalogApiClientException(f'Unable to parse response! Details: {response}')
async with aiohttp.ClientSession(timeout=self.timeout, headers=self._headers) as session:
return await make_request(session)
Контейнер
Контейнер предназначен для сборки необходимых зависимостей и передачи их в приложение. Наш контейнер собран в классе ApplicationContainer. Все зависимости пробрасываются в виде синглтонов @singleton и регистрируются как провайдеры зависимостей типов @provider предоставляемых библиотекой injector. При написании тестов необходимо подготовить другой контейнер с актуальными fake или stub-объектами.
Основной интерес по работе с контейнером сосредоточен в классе ContainerManager
, который используется для проверки реализации миксин EventSubscriberMixin
и EventLiveProbeMixin
. Функция get_event_collection
формирует списки функций обратного вызова для старта и выхода из приложения. Собственно проход по спискам и вызов функций обратного вызова реализован в функциях: run_startup
и run_shutdown
.
Hidden text
class ContainerManager:
def __init__(self, cls_container: Type[Container]) -> None:
self._container = Injector(cls_container())
self._bindings = self._container.binder._bindings
def get_container(self) -> Injector:
return self._container
def get_live_probe_handlers(self) -> List[Type[Callable]]:
result = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, EventLiveProbeMixin):
binding_obj = self._container.get(binding)
result.append(binding_obj.is_connected)
return result
def get_startup_handlers(self):
handlers = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, StartupEventMixin):
binding_obj = self._container.get(binding)
handlers.append(binding_obj.startup())
return handlers
def get_shutdown_handlers(self):
handlers = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, ShutdownEventMixin):
binding_obj = self._container.get(binding)
handlers.append(binding_obj.shutdown())
return handlers
async def run_startup(self) -> None:
exception = None
for handler in self.get_startup_handlers():
if exception:
handler.close()
else:
try:
await handler
except Exception as exc:
exception = exc
if exception is not None:
raise exception
async def run_shutdown(self) -> None:
handlers = []
for handler in self.get_shutdown_handlers():
handlers.append(handler)
await asyncio.gather(*handlers)
Собственно сам контейнер, в котором производится инициализация нужных экземпляров классов. При написании тестов будет создан аналогичный контейнер.
Hidden text
class ApplicationContainer(Container):
@singleton
@provider
def provide_settings(self) -> Settings:
return Settings()
# ... немного кода пропущено
@singleton
@provider
def provide_mongodb_connection(self, settings: Settings, logger: AppLogger) -> MongoConnectionManager:
return MongoConnectionManager(settings, logger)
@singleton
@provider
def provide_rabbitmq_connection(self, settings: Settings, logger: AppLogger) -> RabbitConnectionManager:
return RabbitConnectionManager(settings, logger)
@singleton
@provider
def provide_nalog_api_client(self, settings: Settings, logger: AppLogger) -> NalogApiClient:
return NalogApiClient(settings, logger)
@singleton
@provider
def provide_request_repository(self, settings: Settings, mongo_connection: MongoConnectionManager) -> RequestRepository:
return RequestRepository(mongo_connection, settings)
Приложение
Основная задача приложения - собрать всё воедино и запустить общий поток выполнения. Код сборки приложения предельно простой, инициализацию классов выполняет менеджер контейнера. Сборка приложения выполняется следующими шагами:
получение контейнера, передача его в менеджер контейнеров;
инициализация event_loop;
добавление обработчиков для очередей;
запуск инициализаторов для инфраструктурного слоя (реализующих startup миксины);
запуск веб-сервера FastAPI для отдачи health-check;
включение глобального обработчика ошибок.
Hidden text
class Application:
def __init__(self, cls_container: Type[Container]) -> None:
self.loop = asyncio.get_event_loop()
self.container_manager = ContainerManager(cls_container)
self.container = self.container_manager.get_container()
self.settings = self.container.get(Settings)
self.logger = self.container.get(AppLogger)
self.live_probe_service = self.container.get(LiveProbeService)
self.queue_manager = self.container.get(QueueManager)
self.app_name = self.settings.app_name
self.http_server = None
def init_application(self):
self.http_server = ServerAPIManager(self.container)
request_handler = self.container.get(RequestHandler)
self.queue_manager.add_handler(request_handler)
live_probe_handlers = self.container_manager.get_live_probe_handlers()
for handler in live_probe_handlers:
self.live_probe_service.add_component(handler)
def run(self) -> None:
self.logger.info(f'Starting application {self.app_name}')
self.init_application()
try:
self.loop.run_until_complete(self.container_manager.run_startup())
tasks = asyncio.gather(
self.http_server.serve(),
self.queue_manager.run_handlers_async(),
)
self.loop.run_until_complete(tasks)
self.loop.run_forever()
except BaseException as exception:
exit(1)
finally:
self.loop.run_until_complete(self.container_manager.run_shutdown())
self.loop.close()
self.logger.info('Application disabled')
Приложение стартует из main-скрипта с использованием небольшой библиотеки typer. Маленькая библиотека имеет возможность удобно обрабатывать параметры командной строки.
Hidden text
import typer
from core.application import Application
from app_container import ApplicationContainer
def main():
try:
application = Application(ApplicationContainer)
application.run()
except BaseException as exc:
typer.echo(f'Error starting application. Details: {str(exc)}')
if __name__ == "__main__":
typer.run(main)
Как это всё запустить?
Проект содержит файл docker-compose для сборки. Необходимо скопировать файл .env.example
в файл .env
.
docker compose build
docker compose up
После выполнения этих команд, будет запущен экземпляр mongodb на 27017 порту и rabbitmq на 5672 порту с админкой на 15672. В административную панель RabbitMQ можно зайти по адресу http://localhost:15672. В разделе очередей необходимо создать новую очередь, в которую будут направляться результаты работы сервиса и прибиндить её к exchange по умолчанию (direct).
Продолжение следует
В статье рассмотрена тема разработки приложения на Python с использованием очередей, контейнером зависимостей и поддержкой health-check. Предлагаю обсудить архитектуру в комментариях, а затем продолжить развивать сервис. Следующими итерациями планирую добавить гипотетического не бесплатного клиента, которого будем использовать после определённого количества запросов в бесплатный сервис. И в завершении написать тесты.
Материалы, которые могут быть полезны для понимания материала:
RabbitMQ Tutorials (англ) - https://www.rabbitmq.com/getstarted.html
Презентация Dependency injection от создателя Dependency Injector (рус). В проекте используется библиотека injector, но общий смысл romanmogylatov6331 доносит понятно - https://www.youtube.com/watch?v=VdtxdDeG7RA
funca
Честно говоря двоякое чувство. С одной стороны интригующий выбор темы (с персональными данными, коммуникацией с госами), а с другой - реализация с массой недосказанностей. По сути я думал это статья к студенческой работе или тестовому заданию перспективного джуниора, пока не заглянул в профиль автора.
Софт без задефайненных non-functional requirements, quality attributes и код без тестов это прототип. На этом уровне техническая задача обычно сводится к тому, чтобы решение лишь бы хоть как-то работало. Работает? - вы молодец. Но дальше в отсутствии требований об архитектуре сервиса говорить нет смысла потому, что архитектура выбирается исходя из требований. Вы же понимаете, что допустим для достижения availablity 99.9 и 99.999 у вас будут два принципиально разных архитектурных решения и разницей по сложности и стоимости примерно на один-два порядка.
Но проблема даже не в этом. Принимая ФИО, паспортные данные и т.п., с точки зрения законодательства вы становитесь оператором персональных данных, со всеми вытекающими требованиями - к хранению, обработке и передаче. Я не увидел даже намека. Вы хотели отзыв: десять лет тому назад это выглядело бы даже неплохо (все эти базворды типа onion architecture, DI и т.п.). Но в 2023 предложенное решение - детский сад штаны на лямках - в решении уровня лида хочется все же видеть больше осмысленности.
Filyushin Автор
Спасибо за первый комментарий к первой статье.
По интригующей теме работы с госами, наверное, не скажу ничего, потому как более интересной была бы реализация через СМЭВ. Однако здесь рассмотрен пример сервиса, работающего через очереди, построенный, да, как вы сказали, на устаревших базвордах луковичной архитектуры и контейнеров. Тема получения ИНН скорее второстепенна в статье. Это лишь полезная нагрузка для построения сервиса, некая абстрактная задача, которую можно развивать и на чём можно учиться.
Согласен с отсутствием нефункциональных требований и метрик качества. Сформулировать сейчас не получится, но в одном из разделов указал на скорость ответа базового сервиса от налоговой. При "развитии" сервиса попытаемся сформулировать и эти требования.
Про тесты написано в последнем разделе. Предвосхищая ваш вопрос почему сначала не пишем тесты, а потом код - так не было запроса на следование TDD.
funca
Я ни где не говорил, что это устарело. Скорее развилось и стало рутиной. А вот подходы к работе с PII поменялись радикально из-за необходимости соблюдать кучу юридических церемоний. В вашем решении данные не классифицируются, свободно передаваясь между компонентами. Поэтому с предложенной архитектурой бизнес влетит на мешок денег после первого же compliance report. Чтобы этого не произошло, такие требования нужно закладывать сразу. Попробуйте, вы сразу увидите как это заставит перекроить всю архитектуру.
Поэтому, если нет желания показывать свои навыки в проектировании конкретно приложений для обработки PII, для демо сейчас выбирают что-то более нейтральное, типа API прогноза погоды, котировок валют или подобное. Это будет в разы проще.
Что касается тестов, то писать их до или после это тактика. Главное, чтобы в конце они появились. :) Ну и линтер с тайпчекером прикрутить к CI вроде уже не экзотика.
WhiskyBar
Двоякое чувство от вашего комментария, с одной стороны вроде как по делу, если мы говорим о неком продовом решении, или как ответ на конкретное ТЗ, с другой стороны нигде не заявлялось, что в статье будут рассмотрены данные аспекты:
«Основная идея статьи — поделиться опытом работы с Python в части создания законченного проекта с использованием контейнера зависимостей, создания слушателей для RabbitMQ и работой с базой данных MongoDB.»
Возможно часть дискуссии ведется вне данной площадки, и я встреваю не в чужой диалог, но ради интереса посмотрел, о чем вы писали "лет 10 назад".
askolo4ek
Честно говоря, двоякое чувство от вашего комментария. С одной стороны вроде как по делу, если мы говорим о некотором конкретном пользователе, с другой стороны не придумал пока)