В этой статье хочу рассказать о том, как написать полезный сервис, для получения ИНН по персональным данным (паспортные данные). ИНН физического лица получаем с использование сайта https://service.nalog.ru/. Похожая функциональность, скорее всего, уже где-то и кем-то была реализована. Основная идея статьи - поделиться опытом работы с Python в части создания законченного проекта с использованием контейнера зависимостей, создания слушателей для RabbitMQ и работой с базой данных MongoDB. Работа с клиентами сервиса реализована через RabbitMQ в режиме непрерывного чтения очереди, отправкой результата в выходную очередь. Сервис будет жить в Kubernetes, что требует наличие liveness и readiness проб. Для этого используется веб-сервер.

Фото by Christina Morillo from Pexels
Фото by Christina Morillo from Pexels

Общие сведения

Сервис будем реализовывать на Python 3.10 с использованием библиотек aio-pika, fastapi, pydantic, motor и других библиотек, которые будут указаны в pyproject.toml проекта. В качестве базы данных используем MongoDB 4+. Обращение к сервису налоговой выполняется при помощи библиотеки aiohttp. Проект размещён в публичном доступе на GitHub.

Приложение функционирует как слушатель входной очереди и веб-сервер для отдачи liveness и readiness-проб. При получении сообщения в очередь, из заголовка reply-to вычитывается имя выходной очереди, в которую будет направлен ответ. Обработка запроса передаётся в сервис, который проверяет наличие похожего запроса в базе данных. В случае отсутствия данных по клиенту, выполняется запрос к внешнему сервису. Внешний сервис может обработать какое-то количество сообщений без запроса капчи. После превышения лимитов, которые доподлинно не известны (но изменяются при общей повышенной нагрузке), сообщение помещается в мёртвую очередь и через указанное в настройках время возвращается в обработку.

Подготовительные работы для базы данных не требуется. При первом подключении к MongoDB будут созданы необходимые коллекции и индексы.

Контракт общения с сервисом

Определим контракт входного сообщения в формате JSON:

Hidden text
{
	 "requestId": str,
	 "firstName": str,
	 "lastName": str,
	 "middleName": str,
	 "birthDate": date,
	 "documentSerial": str,
	 "documentNumber": str,
	 "documentDate": date
}

Все поля интуитивно понятны. Атрибут requestId должен быть уникален в пределах всех сообщений, имеет смысл передавать его как строковое представление GUID.

Имя выходной очереди может передаваться через поле reply-to заголовка сообщения.

Контракт выходного сообщения будет следующим:

Hidden text
{
	"requestId": str,
	"inn": str,
	"cached": bool,
	"details": str,
	"elapsedTime": float
}

В ответе будем отдавать код запроса, собственно ИНН и время, за которое отработал сервис запрос и признак кэшированного ответа.

Структура проекта

Общая структура директорий проекта следующая.

src
  |--inn_service
    |--clients
    |--connection_managers
    |--core
    |--infrastructure
       |--controllers
       |--handlers
       |--http_server
       |--queue_manager                     
    |--models    
    |--repositories
    |--serializers
    |--services    
  main.py
.env.example
.gitignore
docker-compose.yaml
pyproject.yaml

В корневой директории будут размещаться инструменты запуска проекта: docker-compose, make-файл запуска линтинга и тестов. Собственно проект размещён в src/inn_service и содержит:

  • clients - клиенты для подключения к действительным поставщикам данных (nalog.ru и прочие);

  • connection_managers - инфраструктурные подключения к базе данных, очередям;

  • core - общий код приложения (собственно приложение, контейнер);

  • infrastructure - менеджер обработчиков очередей, сами обработчики, инфраструктурные контроллеры;

  • models - моделей приложения, DTO-объекты;

  • repositories - репозиторий для работы с базой данных;

  • serializers - сериализаторы входных запросов, данных для отправки в провайдер ИНН;

  • services - сервисы приложения.

Работу по созданию виртуального подключения переложим на PyCharm и poetry. Краткая команда установки: poetry install.

Настройки приложения

Начнём разработку с создания настроек приложения, используя BaseSettings из пакета pydantic.

В файле settings.py будут находиться настройки.

Hidden text
class Settings(BaseSettings):  
    app_name: str = 'INN service'  
    app_request_retry_times: int  # Количество попыток обработки внешнего запроса  
    app_request_retry_sec: int  # Время задержки в секундах перед повторной обработкой запроса  
  
    http_host: str  
    http_port: int  
    http_handler: str = 'asyncio'  
  
    mongo_host: str  
    mongo_port: str  
    mongo_user: str  
    mongo_pass: str  
    mongo_name: str  
    mongo_rs: Optional[str] = None  
    mongo_auth: str  
    mongo_timeout_server_select: int = 5000  
  
    rabbitmq_host: str  
    rabbitmq_port: int  
    rabbitmq_user: str  
    rabbitmq_pass: str  
    rabbitmq_vhost: str  
    rabbitmq_exchange_type: str  
    rabbitmq_prefetch_count: int  
    rabbitmq_source_queue_name: str  
  
    client_nalog_url: str  # Адрес внешнего сервиса для получения ИНН  
    client_nalog_timeout_sec: int  # Таймаут ожидания ответа от сервиса  
    client_nalog_retries: int  # Количество попыток запросов к внешнему сервису  
    client_nalog_wait_sec: int  # Время ожидания между попытками client_nalog_retries  
  
    @property  
    def mongo_dsn(self) -> str:  
        mongo_dsn = 'mongodb://{}:{}@{}:{}/{}'.format(  
            self.mongo_user,  
            self.mongo_pass,  
            self.mongo_host,  
            self.mongo_port,  
            self.mongo_auth  
        )  
  
        if self.mongo_rs:  
            mongo_dsn += f'?replicaSet={self.mongo_rs}'  
  
        return mongo_dsn  
  
    @property  
    def rabbitmq_dsn(self) -> str:  
        return 'amqp://{}:{}@{}:{}/{}'.format(  
            self.rabbitmq_user,  
            self.rabbitmq_pass,  
            self.rabbitmq_host,  
            self.rabbitmq_port,  
            self.rabbitmq_vhost  
        )

Предлагаю не указывать значения по умолчанию для настроек. Если что-то пойдёт не так, то сразу увидим проблему. В этот момент можно подготовить сразу и файл .env.example, содержащий настройки по-умолчанию для сервиса.

Подключения к инфраструктуре

Создадим слой подключения к инфраструктуре rabbitmq, mongodb через компоненты aio-pika и motor:

poetry add motor aio-pika fast fastapi uvicorn injector

Слой подключения будет размещаться в connection_managers и предназначен для организация подключения к базе данных и менеджеру очередей. Добавим две миксины для создания механизма регистрации автозапуска и завершения приложения. Механизм автозапуска функций применяется при старте приложения для инициализации подключения к RabbitMQ и MongoDB, а также для создания индексов в коллекции базы данных. В случае возникновения ошибок при подключении, приложение не стартует и выдаётся ошибка в логи.

Hidden text
class StartupEventMixin(ABC):  
  
    @abstractmethod  
    def startup(self) -> Coroutine:  
        raise NotImplementedError  
  
  
class ShutdownEventMixin(ABC):  
  
    @abstractmethod  
    def shutdown(self) -> Coroutine:  
        raise NotImplementedError

На примере RabbitConnectionManager продемонстрируем реализацию.

Hidden text
class RabbitConnectionManager(StartupEventMixin, ShutdownEventMixin, EventLiveProbeMixin):  
    def startup(self) -> Coroutine:  
        return self.create_connection()

	def shutdown(self) -> Coroutine:  
        return self.close_connection()  
  
	async def create_connection(self) -> None:  
	    self.logger.info('Create connection RabbitMQ')  
	    try:  
	        self._connection = await connect_robust(self._dsn)  
	        self._connection.reconnect_callbacks.add(self.on_connection_restore)  
	        self._connection.close_callbacks.add(self.on_close_connection)  
	        self.connected = True  
	    except ConnectionError as exc:  
	        err_message = f'Rabbit connection problem: {exc}'  
	        self.logger.error(err_message)  
	        raise ConnectionError(err_message)  
	  
	async def close_connection(self) -> None:  
	    if self._connection:  
	        await self._connection.close()
	
	# ... некоторый код пропущен, полная версия на гитхабе
	
	def on_close_connection(self, *args):  
	    self.logger.error('Lost connection to RabbitMQ...')  
	    self.connected = False  
	  
	def on_connection_restore(self, *args):  
	    self.logger.info('Connection to RabbitMQ has been restored...')  
	    self._channel = None  
	    self._exchange = None  
	    self.connected = True

При подключении к RabbitMQ устанавливаются функции коллбэков для реагирования на потерю соединения и его восстановление.

Менеджер обработчиков

Менеджер обработчиков предназначен для управления слушателями (consumers) очередей. В проекте используется концепция "мёртвых очередей", которая позволяет отложить сообщение на некоторое время и вернуться к его обработке позже. Причиной для этого может являться долгий ответ от провайдера, временные ошибки провайдера, требование ввода капчи из-за нагрузки. Достаточно подробно механизм мёртвых очередей технически разобран в статье Отложенные ретраи силами RabbitMQ. Каждый обработчик очереди должен хранить и возвращать признак использования ретраев, время между возвратами в основную очередь на обработку, а также имя очереди, которую планирует слушать. Основной код обработчика находится в run_handler. От функции ожидается True при успешном обработке либо непоправимой ошибке запроса (некорректное тело сообщения) и False, если запрос не удалось обработать, но следует повторить позднее.

Код базового обработчика:

Hidden text
class BaseHandler(ABC):  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            rabbitmq_connection: RabbitConnectionManager  
    ) -> None:  
        self.settings = settings  
        self.logger = logger  
        self.rabbitmq_connection = rabbitmq_connection  
  
    @abstractmethod  
    def get_use_retry(self) -> bool:  
        raise NotImplementedError  
  
    def get_retry_ttl(self) -> int:  
        return 0  
  
    @abstractmethod  
    def get_source_queue(self) -> str:  
        raise NotImplementedError  
  
    def convert_seconds_to_mseconds(self, value: int) -> int:  
        return value * 1000  
  
    @abstractmethod  
    async def run_handler(  
            self,  
            message: dict,  
            request_id: Optional[str],  
            result_queue: Optional[str],  
            count_retry: Optional[int] = 0  
    ) -> bool:  
        raise NotImplementedError

Собственно единственный наследник класса RequestHandler, реализующий приём и обработку сообщения:

Hidden text
class RequestHandler(BaseHandler):  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            rabbitmq_connection: RabbitConnectionManager,  
            service: InnService  
    ) -> None:  
        super().__init__(settings, logger, rabbitmq_connection)  
        self.source_queue_name = self.settings.rabbitmq_source_queue_name  
        self.retry_times = self.settings.app_request_retry_times  
        self.retry_sec = self.settings.app_request_retry_sec  
        self.service = service  
  
    def get_source_queue(self) -> str:  
        return self.source_queue_name  
  
    def get_use_retry(self) -> bool:  
        return True  
  
    def get_retry_ttl(self) -> int:  
        return self.retry_sec  
  
    async def run_handler(  
            self,  
            message: dict,  
            request_id: Optional[str],  
            result_queue: Optional[str],  
            count_retry: Optional[int] = 0  
    ) -> bool:  
        if count_retry > self.retry_times:  
            self.logger.warning(f'Request {request_id} was rejected by excess attempts {self.retry_times} times')  
            return True  
  
        self.logger.info(f'Get request {request_id} for response {result_queue}')  
  
        client_data = RequestSerializer.parse_obj(message)  
  
        response = await self.service.get_client_inn(client_data)  
  
        if result_queue:  
            json_message = response.dict()  
            await self.rabbitmq_connection.send_data_by_queue(json_message, result_queue)  
  
        return True

При получении сообщения проверяем количество повторного попадания в очередь через параметр count_retry. В случае превышения - отправляем статус обработки сообщения (ошибку) в выходную очередь и приостанавливаем обработку данного сообщения. RequestSerializer.parse_obj(message) не обёрнут в try...except блок потому как менеджер очередей контролирует ошибки преобразования сообщений ValidationError.

Работа с базой данных

Выбор на MongoDB пал из-за простоты использования, отсутствия миграций, гибкой схемы обработки данных. В задаче нет необходимости в хранении зависимых данных, оформление связей между таблицами. Для работы с данными будем использовать паттерн Репозиторий.

В базовом репозитории расположены функции работы с данными, индексами в нотации Mongo, а в конкретных классах реализуем необходимые сервису функции. Создание индексов выполняется при старте приложения в фоновом режиме (флаг background), для чего используется имплементация миксины StartupEventMixin. Запросы набора данных поддерживают пагинацию и сортировку.

Конкретный класс создаётся на каждую отдельную коллекцию. В проекте один репозиторий для клиентских запросов. Модель для хранения данных находится в директории models и называется ClientDataModel. Клиентская модель создана с типизацией, поддерживаемой MongoDB (datetime вместо date), для атрибута created_at указана функция генерации значения по умолчанию через default_factory. Также в модель добавлена функция подсчёта времени обработки запроса elapsed_time и метод класса для создания объекта из клиентского запроса.

Hidden text
class ClientDataModel(BaseModel):  
    created_at: datetime = Field(default_factory=datetime.utcnow)  
    request_id: str  
    first_name: str  
    last_name: str  
    middle_name: str  
    birth_date: datetime  
    birth_place: str = Field(default='')  
    passport_num: str  
    document_date: datetime  
    executed_at: Optional[datetime]  
    inn: Optional[str]  
    error: Optional[str]  
  
    @classmethod  
    def create_from_request(cls, request: RequestMqSerializer) -> 'ClientDataModel':  
        return ClientDataModel(  
            request_id=request.request_id,  
            first_name=request.first_name,  
            last_name=request.last_name,  
            middle_name=request.middle_name,  
            birth_date=datetime.combine(request.birth_date, datetime.min.time()),  
            passport_num='{} {}'.format(request.document_serial, request.document_number),  
            document_date=datetime.combine(request.document_date, datetime.min.time()),  
        )  
  
    @property  
    def elapsed_time(self) -> float:  
        end = self.executed_at or datetime.utcnow()  
        return (end - self.created_at).total_seconds()

Код базового репозитория:

Hidden text
class BaseRepository(StartupEventMixin):  
  
    def __init__(self, mongodb_connection_manager: MongoConnectionManager, setting: Settings) -> None:  
        self.mongodb_connection_manager = mongodb_connection_manager  
        self.db_name = setting.mongo_name  
  
    @property  
    def collection_name(self) -> str:  
        raise NotImplementedError  
  
    @property  
    def collection_indexes(self) -> Iterable[IndexDef]:  
        raise NotImplementedError  
  
    def startup(self) -> Coroutine:  
        return self.create_indexes()  
  
    async def create_index(self, field_name: str, sort_id: int) -> None:  
        connection = await self.mongodb_connection_manager.get_connection()  
        collection = connection[self.db_name][self.collection_name]  
        await collection.create_index([(field_name, sort_id), ], background=True)  
  
    async def create_indexes(self) -> None:  
        tasks = []  
        for index_item in self.collection_indexes:  
            tasks.append(self.create_index(index_item.name, index_item.sort))  
        asyncio.ensure_future(asyncio.gather(*tasks))  
  
    async def get_one_document(self, criteria: dict) -> Optional[dict]:  
        connection = await self.mongodb_connection_manager.get_connection()  
        collection = connection[self.db_name][self.collection_name]  
        return await collection.find_one(criteria)  
  
    async def get_list_document(  
            self,  
            criteria: dict,  
            sort_criteria: Optional[list] = None,  
            limit: Optional[int] = 0,  
            skip: Optional[int] = 0,  
    ) -> List[dict]:  
        if not sort_criteria:  
            sort_criteria = []  
        connection = await self.mongodb_connection_manager.get_connection()  
        cursor = connection[self.db_name][self.collection_name].find(  
            criteria,  
            limit=limit,  
            skip=skip,  
            sort=sort_criteria  
        )  
  
        result = list()  
        async for data in cursor:  
            result.append(data)  
        return result  
  
    async def save_document(self, data: dict) -> str:  
        connection = await self.mongodb_connection_manager.get_connection()  
        result = await connection[self.db_name][self.collection_name].insert_one(data)  
        return result.inserted_id  
  
    async def update_document(self, criteria: dict, data: dict) -> None:  
        connection = await self.mongodb_connection_manager.get_connection()  
        await connection[self.db_name][self.collection_name].update_one(criteria, {'$set': data})

Сервисный слой

Сервисный слой выполняет всю необходимую обработку с данными.

  • обращение в базу данных для поиска аналогичного запроса (request_id и паспортные данные);

  • отдать результат, если данные были найдены;

  • выполнить запрос к API;

  • сохранить результат запроса в базу данных;

  • вернуть ответ.

В сервисном слое попытался абстрагироваться от работы с инфраструктурой. Возврат ответа производится в вызывающую функцию, которая должна знать куда вернуть ответ. В данном случае, менеджер очередей "знает" куда ему ответить благодаря наличию поля reply-to в заголовке запроса. Возвращаемое значение оформлено в виде DTO-объекта (RequestDTO).

Код класса InnService:

Hidden text
class InnService:  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            client: NalogApiClient,  
            storage: RequestRepository  
    ) -> None:  
        self.settings = settings  
        self.logger = logger  
        self.client = client  
        self.storage_repository = storage  
  
    async def get_client_inn_from_storage(self, client_data: RequestSerializer) -> Optional[RequestModel]:  
        client_passport = f'{client_data.document_serial} {client_data.document_number}'  
        client_request = await self.storage_repository.find_request(client_passport, client_data.request_id)  
        return client_request  
  
    def update_status(self, model: RequestModel, inn: str, error: str) -> None:  
        model.inn = inn  
        model.error = error  
  
    async def get_client_inn(self, client_data: RequestSerializer) -> RequestDTO:  
        """Получение клиентского ИНН"""  
        start_process = datetime.utcnow()  
        model = RequestModel.create_from_request(client_data)  
  
        # Получить данные из БД  
        existing_data = await self.get_client_inn_from_storage(client_data)  
        if existing_data:  
            elapsed_time = (datetime.utcnow() - start_process).total_seconds()  
            return RequestDTO(  
                request_id=client_data.request_id,  
                inn=existing_data.inn,  
                elapsed_time=elapsed_time,  
                cashed=True  
            )  
  
        # Сделать фактический запрос в Nalog API  
        request = NalogApiRequestSerializer.create_from_request(client_data)  
        error, result = None, ''  
        try:  
            result = await self.client.send_request_for_inn(request)  
        except NalogApiClientException as exception:  
            self.logger.error('Error request to Nalog api service', details=str(exception))  
            error = str(exception)  
  
        self.update_status(model, result, error)  
        await self.storage_repository.save_request(model)  
  
        return RequestDTO(  
            request_id=model.request_id,  
            inn=model.inn,  
            details=model.error,  
            elapsed_time=model.elapsed_time  
        )

Второй сервис в приложении - это сервис опроса инфраструктуры для health-check. Инфраструктурные менеджеры, которые необходимо мониторить, должны наследоваться от миксины EventLiveProbeMixin и реализовать функцию is_connected.

Клиент

Клиент NalogApiClient предназначен для выполнения POST запроса к https://service.nalog.ru/inn.do и разбора статуса ответа. Функция непосредственного оформления запроса обёрнута в retry декоратор повторителя запроса при возникновении ошибок. Настройки повторителя в общих настройках приложения.

Hidden text
class NalogApiClient:  
    CLIENT_EXCEPTIONS = (  
        NalogApiClientException,  
        aiohttp.ClientProxyConnectionError,  
        aiohttp.ServerTimeoutError,  
    )  
  
    def __init__(self, settings: Settings, logger: AppLogger):  
        self.nalog_api_service_url = settings.client_nalog_url  
        self.request_timeout = settings.client_nalog_timeout_sec  
        self.retries_times = settings.client_nalog_retries  
        self.retries_wait = settings.client_nalog_wait_sec  
        self.logger = logger  
        self.timeout = aiohttp.ClientTimeout(total=self.request_timeout)  
  
    @property  
    def _headers(self):  
        return {  
            "Accept": "application/json, text/javascript, */*; q=0.01",  
            "Accept-Language": "ru-RU,ru",  
            "Connection": "keep-alive",  
            "Origin": "https://service.nalog.ru",  
            "Referer": self.nalog_api_service_url,  
            "Sec-Fetch-Dest": "empty",  
            "Sec-Fetch-Mode": "cors",  
            "Sec-Fetch-Site": "same-origin",  
            "Sec-GPC": "1",  
            "X-Requested-With": "XMLHttpRequest",  
        }  
  
    async def send_request_for_inn(self, nalog_api_request: NalogApiRequestSerializer) -> Optional[str]:  
        self.logger.debug(f'Request to nalog api service for {nalog_api_request.client_fullname}')  
  
        form_data = nalog_api_request.dict(by_alias=True)  
  
        @retry(self.CLIENT_EXCEPTIONS, logger=self.logger, attempts=self.retries_times, wait_sec=self.retries_wait)  
        async def make_request(client_session: aiohttp.ClientSession):  
            async with client_session.post(url=self.nalog_api_service_url, data=form_data) as response:  
                if response.status not in [http.HTTPStatus.OK, http.HTTPStatus.NOT_FOUND]:  
                    response_text = await response.text()  
                    raise NalogApiClientException(response_text)  
                data = await response.json()  
                code = data.get('code')  
                captcha_required = data.get('captchaRequired')  
                if captcha_required:  
                    raise NalogApiClientException(f'Captcha required for request {nalog_api_request.client_fullname}')  
                if code == 0:  
                    return 'no inn'  
                elif code == 1:  
                    return data.get('inn')  
                else:  
                    raise NalogApiClientException(f'Unable to parse response! Details: {response}')  
  
        async with aiohttp.ClientSession(timeout=self.timeout, headers=self._headers) as session:  
            return await make_request(session)

Контейнер

Контейнер предназначен для сборки необходимых зависимостей и передачи их в приложение. Наш контейнер собран в классе ApplicationContainer. Все зависимости пробрасываются в виде синглтонов @singleton и регистрируются как провайдеры зависимостей типов @provider предоставляемых библиотекой injector. При написании тестов необходимо подготовить другой контейнер с актуальными fake или stub-объектами.

Основной интерес по работе с контейнером сосредоточен в классе ContainerManager, который используется для проверки реализации миксин EventSubscriberMixin и EventLiveProbeMixin. Функция get_event_collection формирует списки функций обратного вызова для старта и выхода из приложения. Собственно проход по спискам и вызов функций обратного вызова реализован в функциях: run_startup и run_shutdown.

Hidden text
class ContainerManager:  
  
    def __init__(self, cls_container: Type[Container]) -> None:  
        self._container = Injector(cls_container())  
        self._bindings = self._container.binder._bindings  
  
    def get_container(self) -> Injector:  
        return self._container  
  
    def get_live_probe_handlers(self) -> List[Type[Callable]]:  
        result = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, EventLiveProbeMixin):  
                binding_obj = self._container.get(binding)  
                result.append(binding_obj.is_connected)  
        return result  
  
    def get_startup_handlers(self):  
        handlers = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, StartupEventMixin):  
                binding_obj = self._container.get(binding)  
                handlers.append(binding_obj.startup())  
        return handlers  
  
    def get_shutdown_handlers(self):  
        handlers = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, ShutdownEventMixin):  
                binding_obj = self._container.get(binding)  
                handlers.append(binding_obj.shutdown())  
        return handlers  
  
    async def run_startup(self) -> None:  
        exception = None  
        for handler in self.get_startup_handlers():  
            if exception:  
                handler.close()  
            else:  
                try:  
                    await handler  
                except Exception as exc:  
                    exception = exc  
  
        if exception is not None:  
            raise exception  
  
    async def run_shutdown(self) -> None:  
        handlers = []  
        for handler in self.get_shutdown_handlers():  
            handlers.append(handler)  
        await asyncio.gather(*handlers)

Собственно сам контейнер, в котором производится инициализация нужных экземпляров классов. При написании тестов будет создан аналогичный контейнер.

Hidden text
class ApplicationContainer(Container):  
  
    @singleton  
    @provider    
    def provide_settings(self) -> Settings:  
        return Settings()  
  
	# ... немного кода пропущено

    @singleton  
    @provider    
    def provide_mongodb_connection(self, settings: Settings, logger: AppLogger) -> MongoConnectionManager:  
        return MongoConnectionManager(settings, logger)  
  
    @singleton  
    @provider    
    def provide_rabbitmq_connection(self, settings: Settings, logger: AppLogger) -> RabbitConnectionManager:  
        return RabbitConnectionManager(settings, logger)  
  
    @singleton  
    @provider    
    def provide_nalog_api_client(self, settings: Settings, logger: AppLogger) -> NalogApiClient:  
        return NalogApiClient(settings, logger)  
  
    @singleton  
    @provider    
    def provide_request_repository(self, settings: Settings, mongo_connection: MongoConnectionManager) -> RequestRepository:  
        return RequestRepository(mongo_connection, settings)

Приложение

Основная задача приложения - собрать всё воедино и запустить общий поток выполнения. Код сборки приложения предельно простой, инициализацию классов выполняет менеджер контейнера. Сборка приложения выполняется следующими шагами:

  • получение контейнера, передача его в менеджер контейнеров;

  • инициализация event_loop;

  • добавление обработчиков для очередей;

  • запуск инициализаторов для инфраструктурного слоя (реализующих startup миксины);

  • запуск веб-сервера FastAPI для отдачи health-check;

  • включение глобального обработчика ошибок.

Hidden text
class Application:  
  
    def __init__(self, cls_container: Type[Container]) -> None:  
        self.loop = asyncio.get_event_loop()  
        self.container_manager = ContainerManager(cls_container)  
        self.container = self.container_manager.get_container()  
        self.settings = self.container.get(Settings)  
        self.logger = self.container.get(AppLogger)  
        self.live_probe_service = self.container.get(LiveProbeService)  
        self.queue_manager = self.container.get(QueueManager)  
        self.app_name = self.settings.app_name  
        self.http_server = None  
  
    def init_application(self):  
        self.http_server = ServerAPIManager(self.container)  
  
        request_handler = self.container.get(RequestHandler)  
        self.queue_manager.add_handler(request_handler)  
  
        live_probe_handlers = self.container_manager.get_live_probe_handlers()  
        for handler in live_probe_handlers:  
            self.live_probe_service.add_component(handler)  
  
    def run(self) -> None:  
        self.logger.info(f'Starting application {self.app_name}')  
  
        self.init_application()  
  
        try:  
            self.loop.run_until_complete(self.container_manager.run_startup())  
  
            tasks = asyncio.gather(  
                self.http_server.serve(),  
                self.queue_manager.run_handlers_async(),  
            )  
            self.loop.run_until_complete(tasks)  
  
            self.loop.run_forever()  
        except BaseException as exception:  
            exit(1)  
        finally:  
            self.loop.run_until_complete(self.container_manager.run_shutdown())  
  
            self.loop.close()  
            self.logger.info('Application disabled')

Приложение стартует из main-скрипта с использованием небольшой библиотеки typer. Маленькая библиотека имеет возможность удобно обрабатывать параметры командной строки.

Hidden text
import typer  
from core.application import Application  
from app_container import ApplicationContainer  
  
  
def main():  
    try:  
        application = Application(ApplicationContainer)  
        application.run()  
    except BaseException as exc:  
        typer.echo(f'Error starting application. Details: {str(exc)}')  
  
  
if __name__ == "__main__":  
    typer.run(main)

Как это всё запустить?

Проект содержит файл docker-compose для сборки. Необходимо скопировать файл .env.example в файл .env .

docker compose build
docker compose up

После выполнения этих команд, будет запущен экземпляр mongodb на 27017 порту и rabbitmq на 5672 порту с админкой на 15672. В административную панель RabbitMQ можно зайти по адресу http://localhost:15672. В разделе очередей необходимо создать новую очередь, в которую будут направляться результаты работы сервиса и прибиндить её к exchange по умолчанию (direct).

Продолжение следует

В статье рассмотрена тема разработки приложения на Python с использованием очередей, контейнером зависимостей и поддержкой health-check. Предлагаю обсудить архитектуру в комментариях, а затем продолжить развивать сервис. Следующими итерациями планирую добавить гипотетического не бесплатного клиента, которого будем использовать после определённого количества запросов в бесплатный сервис. И в завершении написать тесты.

Материалы, которые могут быть полезны для понимания материала:

Комментарии (6)


  1. funca
    16.04.2023 20:51
    +3

    Честно говоря двоякое чувство. С одной стороны интригующий выбор темы (с персональными данными, коммуникацией с госами), а с другой - реализация с массой недосказанностей. По сути я думал это статья к студенческой работе или тестовому заданию перспективного джуниора, пока не заглянул в профиль автора.

    Софт без задефайненных non-functional requirements, quality attributes и код без тестов это прототип. На этом уровне техническая задача обычно сводится к тому, чтобы решение лишь бы хоть как-то работало. Работает? - вы молодец. Но дальше в отсутствии требований об архитектуре сервиса говорить нет смысла потому, что архитектура выбирается исходя из требований. Вы же понимаете, что допустим для достижения availablity 99.9 и 99.999 у вас будут два принципиально разных архитектурных решения и разницей по сложности и стоимости примерно на один-два порядка.

    Но проблема даже не в этом. Принимая ФИО, паспортные данные и т.п., с точки зрения законодательства вы становитесь оператором персональных данных, со всеми вытекающими требованиями - к хранению, обработке и передаче. Я не увидел даже намека. Вы хотели отзыв: десять лет тому назад это выглядело бы даже неплохо (все эти базворды типа onion architecture, DI и т.п.). Но в 2023 предложенное решение - детский сад штаны на лямках - в решении уровня лида хочется все же видеть больше осмысленности.


    1. Filyushin Автор
      16.04.2023 20:51
      +1

      Спасибо за первый комментарий к первой статье.

      По интригующей теме работы с госами, наверное, не скажу ничего, потому как более интересной была бы реализация через СМЭВ. Однако здесь рассмотрен пример сервиса, работающего через очереди, построенный, да, как вы сказали, на устаревших базвордах луковичной архитектуры и контейнеров. Тема получения ИНН скорее второстепенна в статье. Это лишь полезная нагрузка для построения сервиса, некая абстрактная задача, которую можно развивать и на чём можно учиться.

      Согласен с отсутствием нефункциональных требований и метрик качества. Сформулировать сейчас не получится, но в одном из разделов указал на скорость ответа базового сервиса от налоговой. При "развитии" сервиса попытаемся сформулировать и эти требования.

      Про тесты написано в последнем разделе. Предвосхищая ваш вопрос почему сначала не пишем тесты, а потом код - так не было запроса на следование TDD.


      1. funca
        16.04.2023 20:51

        как вы сказали, на устаревших базвордах луковичной архитектуры и контейнеров.

        Я ни где не говорил, что это устарело. Скорее развилось и стало рутиной. А вот подходы к работе с PII поменялись радикально из-за необходимости соблюдать кучу юридических церемоний. В вашем решении данные не классифицируются, свободно передаваясь между компонентами. Поэтому с предложенной архитектурой бизнес влетит на мешок денег после первого же compliance report. Чтобы этого не произошло, такие требования нужно закладывать сразу. Попробуйте, вы сразу увидите как это заставит перекроить всю архитектуру.

        Поэтому, если нет желания показывать свои навыки в проектировании конкретно приложений для обработки PII, для демо сейчас выбирают что-то более нейтральное, типа API прогноза погоды, котировок валют или подобное. Это будет в разы проще.

        Что касается тестов, то писать их до или после это тактика. Главное, чтобы в конце они появились. :) Ну и линтер с тайпчекером прикрутить к CI вроде уже не экзотика.


    1. WhiskyBar
      16.04.2023 20:51
      +1

      Двоякое чувство от вашего комментария, с одной стороны вроде как по делу, если мы говорим о неком продовом решении, или как ответ на конкретное ТЗ, с другой стороны нигде не заявлялось, что в статье будут рассмотрены данные аспекты:

      «Основная идея статьи — поделиться опытом работы с Python в части создания законченного проекта с использованием контейнера зависимостей, создания слушателей для RabbitMQ и работой с базой данных MongoDB.»

      Возможно часть дискуссии ведется вне данной площадки, и я встреваю не в чужой диалог, но ради интереса посмотрел, о чем вы писали "лет 10 назад".


      1. askolo4ek
        16.04.2023 20:51

        Честно говоря, двоякое чувство от вашего комментария. С одной стороны вроде как по делу, если мы говорим о некотором конкретном пользователе, с другой стороны не придумал пока)


  1. The_Immortal
    16.04.2023 20:51

    Скорее актуальнее получать ИИН ;)