Приветствую, дорогие коллеги и энтузиасты мира разработки!

Сегодня я рад представить вам новый увлекательный проект, который, несомненно, заинтересует как начинающих, так и опытных разработчиков. Речь пойдет о создании API-сервиса на базе FastAPI для мониторинга актуальных курсов валют в банках России.

Концепция проекта

Мы создадим надежный и быстрый инструмент для отслеживания валютных курсов, который найдет применение как в личных финансах, так и в бизнес-сфере. Для реализации проекта будет использован следующий стек технологий:

  • FastAPI — для создания высокопроизводительного API.

  • Aiohttp — для асинхронных HTTP-запросов.

  • BeautifulSoup4 (BS4) — для эффективного парсинга данных.

  • APScheduler — для выполнения задач по расписанию.

  • SQLAlchemy с Aiosqlite — для асинхронной работы с базой данных.

Архитектура и ключевые компоненты

Проект состоит из нескольких ключевых модулей:

  1. Асинхронный парсер агрегатора курсов валют (Aiohttp + BS4).

  2. Планировщик задач для регулярного обновления данных, интегрированный в жизненный цикл FastAPI приложения (lifespan).

  3. Система аутентификации с разграничением прав доступа.

  4. API-методы для взаимодействия с данными о курсах валют.

Функциональность банковского API

Сервис предоставит пользователям следующие возможности:

  • Получение актуальных курсов USD и EUR по всем банкам РФ.

  • Фильтрация курсов по диапазонам цен.

  • Определение банков с наиболее выгодными курсами.

  • Анализ минимальных и максимальных цен на валюту (продажа и покупка).

Процесс разработки и деплой

В ходе статьи мы подробно разберем:

  • Каждый этап разработки.

  • Ключевые решения и возможные трудности.

  • Особенности асинхронного программирования и эффективной работы с данными.

Финальный этап:

Мы выполним быстрый деплой нашего API на Amvera Cloud, что обеспечит:

  • Бесплатный HTTPS-домен.

  • Простоту интеграции с различными платформами (Telegram-бот, веб-сайт или мобильное приложение).

Дисклеймер и контекст

Прежде чем погрузиться в тему, хочу сделать небольшое предисловие. В предыдущих статьях я уже подробно рассказывал о работе с SQLAlchemy, Alembic и FastAPI. Чтобы не повторяться, сегодня я буду часто ссылаться на эти материалы. Если потребуется более глубокое понимание, рекомендую ознакомиться с соответствующими статьями.

О "болванке" для API

Сегодняшний API мы будем строить на основе моей "болванки" — базового шаблона, который я активно использую в рабочих проектах. Подробную презентацию шаблона можно найти в моей предыдущей статье.

Для тех, кто хочет первым получать эксклюзивный контент, не публикуемый на Хабре, приглашаю в мой Telegram-канал "Легкий путь в Python", в частности, там вы найдете полный исходный код сегодняшнего проекта.

Краткий обзор шаблона

Моя "болванка" — это готовый шаблон для разработки масштабируемых веб-приложений на базе FastAPI. Ключевые особенности:

  • Полная поддержка аутентификации и авторизации.

  • Модульная архитектура.

  • Гибкое логирование с использованием Loguru.

  • Асинхронное взаимодействие с базой данных через SQLAlchemy.

  • Удобная система миграций на базе Alembic.

  • Универсальный класс для работы с БД.

  • Модули для кастомной авторизации, регистрации и аутентификации.

Этот шаблон значительно упрощает и ускоряет подготовку инфраструктуры для новых проектов.

Если найдете этот проект полезным, буду признателен за вашу поддержку в виде звездочки на GitHub! Также буду рад узнать в комментариях, как он помог в вашей работе.

Ссылка на проект: FastAPI Template with Auth

Подготовка проекта

Как упоминалось ранее, для работы мы воспользуемся готовым шаблоном, который будет расширен дополнительными API-методами и моделями таблиц для SQLAlchemy. Из «коробки» вы получите проект с базовыми API-методами и моделями таблиц, обеспечивающими функциональность регистрации, авторизации и аутентификации пользователей с поддержкой ролей.

После входа в систему пользователь будет получать JWT-токен, который будет использоваться для проверки прав доступа к методам.

Подробнее про JWT-токены и общую логику аутентификации в FastAPI вы можете прочитать в моей статье:

Важные материалы по SQLAlchemy

В рамках проекта основное внимание будет уделено работе с SQLAlchemy. Рекомендую ознакомиться с моими статьями, чтобы углубиться в тему:

  1. Асинхронный SQLAlchemy 2: простой пошаговый гайд по настройке, моделям, связям и миграциям с использованием Alembic.

  2. Асинхронный SQLAlchemy 2: пошаговый гайд по управлению сессиями, добавлению и извлечению данных с Pydantic.

  3. Асинхронный SQLAlchemy 2: улучшение кода, методы обновления и удаления данных.

Настройка проекта

Шаг 1. Клонирование репозитория

Откройте терминал и выполните следующую команду, чтобы скопировать проект на свой компьютер:

git clone https://github.com/Yakvenalex/FastApiWithAuthSample

Эта команда создаст новую папку с проектом, содержащую полную копию репозитория.

Шаг 2. Переход в директорию проекта

Перейдите в папку с проектом:

cd FastApiWithAuthSampl

Теперь проект готов для установки зависимостей и дальнейших шагов по настройке. Откройте его в удобной IDE, например, PyCharm.

Расширение файла requirements.txt

Добавьте следующие модули в файл requirements.txt:

aiohttp==3.11.2
bs4==0.0.2
apscheduler==3.10.4
  • Aiohttp: библиотека для выполнения асинхронных GET-запросов к агрегатору https://ru.myfin.by.

  • BS4: инструмент для парсинга HTML и извлечения нужных данных.

  • APSCHEDULER: будет использоваться для запуска асинхронного парсера по расписанию (каждые 10 минут).

На выходе файл requirements.txt должен выглядеть так:

fastapi[all]==0.115.0
pydantic==2.9.2
pydantic[email]
uvicorn==0.31.0
jinja2==3.1.4
pydantic_settings==2.5.2
aiosqlite==0.20.0
alembic==1.13.3
SQLAlchemy==2.0.35
bcrypt==4.0.1
passlib[bcrypt]==1.7.4
python-jose==3.3.0
loguru==0.7.2
aiohttp==3.11.2
bs4==0.0.2
apscheduler==3.10.4

Установка зависимостей

Выполните команду для установки всех зависимостей:

pip install -r requirements.txt

Текущая реализация AUTH

Модуль авторизации, аутентификации и ролей в этом проекте уже реализован. Если вы клонировали мой репозиторий, то база данных также была скопирована, поэтому необходимость в выполнении миграций через Alembic отсутствует.

Для проверки запустите приложение FastAPI. Выполните в корне проекта:

uvicorn app.main:app --reload

Если порт 8000 занят, укажите другой, например:

uvicorn app.main:app --port 8900 --reload

Проверка работы

После запуска приложения в терминале вы увидите вывод, подтверждающий его успешную работу. Теперь откройте в браузере адрес:

http://127.0.0.1:8900/docs

На этой странице вы сможете протестировать методы API, проверить их корректность и убедиться в работе всей системы.

После проверки всех методов можно переходить к следующему этапу разработки.

Подготовка к созданию парсера

Теперь мы детально сосредоточимся на написании асинхронного парсера данных. Но сначала давайте в корневой папке app создадим папку api и добавим в неё файл schemas.py.

Если вы читали мои предыдущие статьи по FastAPI, то знаете, что я придерживаюсь подхода, при котором каждый отдельный блок API размещается в отдельной папке. Обычно такая папка содержит следующие файлы:

  • dao.py: файл с методами SQLAlchemy, предназначенными для работы с конкретной сущностью API. В нашем случае — это методы для работы с "банковским API".

  • models.py: содержит модели SQLAlchemy для сущности API (или микросервиса). Сегодня мы создадим таблицу для хранения курсов валют по банкам.

  • schemas.py: Pydantic-модели для валидации данных конкретного микросервиса. Эти модели будут использоваться в парсере. Если вы не знакомы с Pydantic, рекомендую мою статью Pydantic 2: Полное руководство для Python-разработчиков.

  • router.py: файл с основной функциональностью микросервиса.

Создание схем

Добавим в файл app/api/schemas.py следующую модель:

from pydantic import BaseModel, ConfigDict


class BankNameSchema(BaseModel):
    bank_en: str


class CurrencyRateSchema(BankNameSchema):
    bank_name: str
    link: str
    usd_buy: float
    usd_sell: float
    eur_buy: float
    eur_sell: float
    update_time: str

    model_config = ConfigDict(from_attributes=True)

CurrencyRateSchema будет использоваться в парсере для валидации собранных данных перед их добавлением в таблицу с банками.

Подготовка модели таблицы

Создадим модель таблицы для хранения данных о курсах валют. В файле app/api/models.py добавим следующий код:

from sqlalchemy.orm import Mapped
from app.dao.database import Base, str_uniq, float_col


class CurrencyRate(Base):
    # Название банка (на русском)
    bank_name: Mapped[str_uniq]

    # Название банка (на английском, для поиска)
    bank_en: Mapped[str_uniq]

    # Ссылка на страницу с курсами валют
    link: Mapped[str_uniq]

    # Курсы валют: покупка и продажа USD
    usd_buy: Mapped[float_col]
    usd_sell: Mapped[float_col]

    # Курсы валют: покупка и продажа EUR
    eur_buy: Mapped[float_col]
    eur_sell: Mapped[float_col]

    # Время последнего обновления
    update_time: Mapped[str]

Поля id, updated_at и created_at добавляются автоматически, так как они наследуются из базового класса Base.

Миграция таблицы

Выполним миграцию новой таблицы с помощью Alembic. Для начала зарегистрируем таблицу в файле migrations/env.py. Найдите строку:

from app.auth.models import Role, User

И добавьте под ней:

from app.api.models import CurrencyRate

Теперь выполните две команды:

  1. Создание файла с инструкциями для миграции:

    alembic revision --autogenerate -m "add currency table"
    
  2. Применение миграции:

    alembic upgrade head
    

После этого в базе данных появится новая таблица.

Создание DAO

Для работы с таблицей создадим класс доступа к данным (DAO). В файле app/api/dao.py добавим следующий код:

from app.api.models import CurrencyRate
from app.dao.base import BaseDAO


class CurrencyRateDAO(BaseDAO):
    model = CurrencyRate

Этот класс наследуется от BaseDAO, предоставляя универсальные методы для работы с базой данных. Однако нам понадобится написать собственные методы, включая метод для массового обновления записей.

Массовое обновление данных

Метод массового обновления курсов валют будет учитывать, что в источнике данных (сайт-донор) отсутствует поле ID, сопоставимое с нашим. Вместо этого мы используем поле bank_en — уникальное название банка на английском языке.

Добавим следующий метод в CurrencyRateDAO:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.sql.expression import update
from typing import List
from pydantic import BaseModel
import logging

logger = logging.getLogger(__name__)

class CurrencyRateDAO(BaseDAO):
    model = CurrencyRate

    @classmethod
    async def bulk_update_currency(cls, session: AsyncSession, records: List[BaseModel]) -> int:
        """Массовое обновление валютных курсов."""
        try:
            updated_count = 0
            for record in records:
                record_dict = record.model_dump(exclude_unset=True)
                if not (bank_en := record_dict.get('bank_en')):
                    logger.warning("Пропуск записи: отсутствует bank_en")
                    continue

                update_data = {k: v for k, v in record_dict.items() if k != 'bank_en'}
                if not update_data:
                    logger.warning(f"Пропуск записи: нет данных для обновления банка {bank_en}")
                    continue

                stmt = update(cls.model).where(cls.model.bank_en == bank_en).values(**update_data)
                result = await session.execute(stmt)
                updated_count += result.rowcount

            await session.commit()
            logger.info(f"Обновлено записей: {updated_count}")
            return updated_count
        except SQLAlchemyError as e:
            await session.rollback()
            logger.error(f"Ошибка массового обновления: {e}")
            raise

Объяснение метода

  1. Валидация входных данных: Проверяется наличие bank_en и данных для обновления.

  2. Формирование SQL-запроса: Используется update для массового обновления записей.

  3. Обработка исключений: При ошибке выполнения откатываются изменения, а ошибка логируется.

На этом этапе мы подготовили основные компоненты для работы с таблицей валютных курсов и реализовали метод для массового обновления. В дальнейшем мы вернёмся к этому классу, чтобы добавить методы для работы с "банковским API".

Следующий шаг — создание асинхронного парсера.

Пишем асинхронный парсер

Для создания парсера в папке app создадим новую папку scheduler. В этой папке разместим два файла:

  • parser.py: асинхронный парсер данных.

  • scheduler.py: скрипты для запуска задач по расписанию.

Настройка scheduler/parser.py

Импорты

import aiohttp
import asyncio
from loguru import logger
from bs4 import BeautifulSoup
from aiohttp import ClientSession, ClientTimeout, ClientError
from typing import List, Optional
from pydantic import BaseModel
from app.api.schemas import CurrencyRateSchema

Объяснение импортов:

  • aiohttp и asyncio: для работы с асинхронным кодом и HTTP-запросами.

  • loguru.logger: удобная библиотека для логирования.

  • BeautifulSoup: для парсинга HTML.

  • aiohttp.ClientSession, ClientTimeout, ClientError: работа с HTTP-сессиями, таймаутами и обработка ошибок.

  • typing.List, typing.Optional: аннотации типов для списков и опциональных значений.

  • pydantic.BaseModel: валидация данных.

  • app.api.schemas.CurrencyRateSchema: используемая нами модель данных.

Асинхронное получение HTML с обработкой ошибок

async def fetch_html(url: str, session: ClientSession, retries: int = 3) -> Optional[str]:
    attempt = 0
    while attempt < retries:
        try:
            async with session.get(url) as response:
                response.raise_for_status()  # Вызывает исключение при ошибке HTTP
                return await response.text()
        except (ClientError, asyncio.TimeoutError) as e:
            logger.error(f"Ошибка при запросе {url}: {e}")
            attempt += 1
            if attempt == retries:
                logger.critical(f"Не удалось получить данные с {url} после {retries} попыток.")
                return None
            await asyncio.sleep(2 ** attempt)  # Экспоненциальная задержка
        except Exception as e:
            logger.error(f"Неизвестная ошибка при запросе {url}: {e}")
            return None
    return None

Эта функция отправляет HTTP-запрос, повторяет попытки при неудаче и логирует ошибки.

Парсинг HTML-кода

def parse_currency_table(html: str) -> List[BaseModel]:
    soup = BeautifulSoup(html, 'html.parser')
    try:
        table = soup.find('table', class_='content_table').find('tbody')
        rows = table.find_all('tr')
        currencies = []

        for row in rows:
            bank_name = row.find('td', class_='bank_name').get_text(strip=True)
            link = row.find('a')
            try:
                usd_buy = float(row.find_all('td', class_='USD')[0].get_text(strip=True).replace(',', '.'))
                usd_sell = float(row.find_all('td', class_='USD')[1].get_text(strip=True).replace(',', '.'))
                eur_buy = float(row.find_all('td', class_='EUR')[0].get_text(strip=True).replace(',', '.'))
                eur_sell = float(row.find_all('td', class_='EUR')[1].get_text(strip=True).replace(',', '.'))
            except (ValueError, IndexError) as e:
                logger.warning(f"Ошибка при парсинге курсов валют для {bank_name}: {e}")
                continue

            update_time = row.find('time').get_text(strip=True)
            link_info = get_link_info(link)
            currencies.append(CurrencyRateSchema(**{
                'bank_name': bank_name,
                'bank_en': link_info[1],
                'link': link_info[0],
                'usd_buy': usd_buy,
                'usd_sell': usd_sell,
                'eur_buy': eur_buy,
                'eur_sell': eur_sell,
                'update_time': update_time,
            }))
        return currencies
    except Exception as e:
        logger.error(f"Ошибка при парсинге HTML: {e}")
        return []

Эта функция парсит HTML-страницу, полученную после GET-запроса к агрегатору, и извлекает данные о банках и курсах валют, возвращая список объектов CurrencyRateSchema.

Объединение логики

async def fetch_page_data(url: str, session: ClientSession) -> List[BaseModel]:
    html = await fetch_html(url, session)
    if html:
        return parse_currency_table(html)
    return []

Главная функция парсера

async def fetch_all_currencies() -> List[BaseModel]:
    all_currencies = []
    base_url = 'https://ru.myfin.by/currency?page='

    timeout = ClientTimeout(total=10, connect=5)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        tasks = [fetch_page_data(f'{base_url}{page}', session) for page in range(1, 5)]
        results = await asyncio.gather(*tasks)

        for currencies in results:
            all_currencies.extend(currencies)

    return all_currencies

Здесь используется asyncio.gather для параллельной обработки запросов. Это значительно ускоряет сбор данных.

Настройка scheduler.py

from app.api.dao import CurrencyRateDAO
from app.dao.session_maker import session_manager
from app.scheduler.parser import fetch_all_currencies

@session_manager.connection(commit=True)
async def add_data_to_db(session):
    rez = await fetch_all_currencies()
    await CurrencyRateDAO.add_many(session=session, instances=rez)

@session_manager.connection(commit=True)
async def upd_data_to_db(session):
    rez = await fetch_all_currencies()
    await CurrencyRateDAO.bulk_update_currency(session=session, records=rez)

Объяснение функций:

  1. add_data_to_db: первичное добавление данных.

  2. upd_data_to_db: обновление данных.

Тут я продемонстрировал гибкость подхода в использовании моего универсального класса для генерации сессий. В данном случае мы импортировали session_manager. Эксклюзивное пояснение того, как это все работает я давал в своем телеграмм канале, но, думаю, вы и сами разберетесь с этим.

Первый метод (add_data_to_db) выполняет первичное добавление информации по банкам в базу данных. На этом этапе выполняется, как сам парсер, так и добавление полученных данных в базу данных.

Для добавления используется универсальный метод add_many базового класса BaseDao.

И с ним есть важный момент. В реальной практике данный метод нужно усилить и внедрять его в расписание на выполнение, например, один раз при запуске приложения. Но, так как проект учебный, я предлагаю его выполнить всего 1 раз прямо с файла в котором написан этот код.

Одноразовый запуск добавления данных

import asyncio

asyncio.run(add_data_to_db())

После выполнения можно удалить эти строки кода.

Проверим собраны ли данные.

 Информация записана и на момент записи у меня 70 банков.
Информация записана и на момент записи у меня 70 банков.

Теперь, когда у нас есть что обновлять мы можем включить метод для обновления данных в расписание FastApi. Предлагаю сделать, чтоб парсер запускался 1 раз в 10 минут. Не вижу смысла делать это чаще.

Включаем задачу в расписание

Чтобы включить задачу в расписание, нам нужно внести изменения в файл main.py. Основная задача — описать жизненный цикл приложения FastAPI.

Добавление импортов

from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from app.scheduler.scheduller import upd_data_to_db

Разберем подробнее:

  • from contextlib import asynccontextmanager
    Позволяет создавать асинхронные контекстные менеджеры для управления ресурсами в асинхронном коде.

  • from apscheduler.schedulers.asyncio import AsyncIOScheduler
    Асинхронный планировщик задач, который работает с циклом событий asyncio.

  • from apscheduler.triggers.interval import IntervalTrigger
    Используется для создания триггеров, которые запускают задачи с заданным интервалом.

  • from app.scheduler.scheduller import upd_data_to_db
    Импорт функции, которая будет обновлять данные в базе данных по расписанию.

Создание планировщика

scheduler = AsyncIOScheduler()

Объект scheduler будет отвечать за планирование и выполнение задач.

Функция управления жизненным циклом приложения

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Управляет жизненным циклом планировщика приложения.

    Args:
        app (FastAPI): Экземпляр приложения FastAPI.
    """
    try:
        # Настройка и запуск планировщика
        scheduler.add_job(
            upd_data_to_db,
            trigger=IntervalTrigger(minutes=10),
            id='currency_update_job',
            replace_existing=True
        )
        scheduler.start()
        logger.info("Планировщик обновления курсов валют запущен")
        yield
    except Exception as e:
        logger.error(f"Ошибка инициализации планировщика: {e}")
    finally:
        # Завершение работы планировщика
        scheduler.shutdown()
        logger.info("Планировщик обновления курсов валют остановлен")

Описание кода:

  1. Декоратор @asynccontextmanager:
    Позволяет функции lifespan работать как асинхронный контекстный менеджер.

  2. Блок try:

    • Добавляет задачу upd_data_to_db в планировщик с 10-минутным интервалом.

    • Запускает планировщик.

    • Логирует успешный запуск.

  3. yield:
    Позволяет приложению продолжить выполнение после запуска планировщика.

  4. Блок except:
    Логирует ошибки инициализации.

  5. Блок finally:
    Останавливает планировщик при завершении работы приложения и логирует это.

Интеграция жизненного цикла в приложение

app = FastAPI(lifespan=lifespan)

Что мы сделали:

  1. Создали экземпляр приложения FastAPI.

  2. Добавили асинхронный контекстный менеджер lifespan для управления жизненным циклом приложения.

  3. Интеграция позволяет автоматически запускать и завершать работу планировщика.

Остальная часть кода в main.py остается без изменений.

Проверка работы

Теперь достаточно перезапустить приложение. Планировщик начнет автоматически обновлять данные раз в 10 минут, и это будет отражено в логах.

Следующий шаг

Когда задачи, связанные с расписанием, завершены, можно переходить к созданию API-методов для работы с "банковским API". Приступим!

Создание банковского API

Мы уже создали папку app/api и продолжим с ней дальнейшую работу.

На этом этапе нам необходимо реализовать дополнительные методы для работы с данными от агрегатора, создать сопутствующие схемы Pydantic и, наконец, реализовать сами эндпоинты (API-методы).

Напишем несколько простых эндпоинтов, которые не потребуют создания дополнительных методов для работы с таблицами. Мы сможем ограничиться универсальными методами из BaseDao. Реализуем следующие API-методы:

  1. Метод, который возвращает актуальные курсы валют всех банков для авторизованных пользователей.

  2. Метод, который возвращает курсы валют конкретного банка по его английскому названию для авторизованных пользователей.

  3. Метод, который возвращает расширенную информацию о курсах валют (только для администраторов).

Работаем с файлом app/api/router.py.

Импорты

Начнем с импортов:

from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.utils import validate_range, validate_currency_type, get_currency_ranges
from app.auth.dependencies import get_current_user, get_current_admin_user
from app.auth.models import User
from app.config import settings
from app.dao.session_maker import SessionDep
from app.api.dao import CurrencyRateDAO
from app.api.schemas import (
    CurrencyRateSchema, BankNameSchema, CurrencyRangeFilterSchema,
    AdminCurrencySchema, CurrencySaleRangeFilterSchema, BestRateResponse
)

Обратите внимание, что на блок со схемами пока не нужно заострять внимание, так как недостающие схемы мы заполним чуть позже.

Важные детали

  1. Импорт SessionDep: Этот объект из пакета dao — специальный генератор асинхронной сессии, созданный под FastAPI. Его особенность в том, что он не выполняет коммит в базе данных после операций.

  2. Импорты из utils: Здесь подключены функции, вынесенные в файл api/utils.py, который мы скоро создадим.

  3. Настройки: Для работы с переменными окружения и настройками используется модуль config.

Давайте реализуем недостающий функционал. Начнем с настроек.

Изменение файла app/config.py

Изменим файл следующим образом:

import os
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    BASE_DIR: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
    DB_URL: str = f"sqlite+aiosqlite:///{BASE_DIR}/data/db.sqlite3"
    SECRET_KEY: str
    ALGORITHM: str
    VALID_CURRENCIES: list = ["usd", "eur"]
    ERROR_MESSAGES: dict = {
        "currency_type": "Некорректный тип валюты. Используйте 'usd' или 'eur'.",
        "range": "Неверно задан диапазон.",
        "not_found": "Не найдены курсы валют.",
        "bank_not_found": "Банк не найден."
    }
    CURRENCY_FIELDS: dict = {
        'usd': {'buy': 'usd_buy', 'sell': 'usd_sell'},
        'eur': {'buy': 'eur_buy', 'sell': 'eur_sell'}
    }
    model_config = SettingsConfigDict(env_file=f"{BASE_DIR}/.env")


# Получаем параметры для загрузки переменных среды
settings = Settings()
database_url = settings.DB_URL

Что добавлено?

  1. VALID_CURRENCIES: Список поддерживаемых валют для API.

  2. ERROR_MESSAGES: Словарь с заготовленными сообщениями об ошибках для различных ситуаций.

  3. CURRENCY_FIELDS: Словарь с ключами, которые будут использоваться в методах SQLAlchemy.

Эти переменные введены для оптимизации и упрощения кода. Хотя можно было не выносить их в класс Settings, такой подход обеспечивает чистоту и повторное использование.

Теперь у нас есть доступ ко всем настройкам через объект класса Settings, включая переменные из файла .env и добавленные вручную. Например, чтобы обратиться к переменной, достаточно использовать точечный синтаксис:

  • settings.VALID_CURRENCIES.

Создание файла app/api/utils.py

В следующем шаге мы создадим файл utils.py в папке app/api, чтобы вынести часть кода для оптимизации и экономии времени.

Приложу полный код и после прокомментирую:

from typing import Tuple

from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.dao import CurrencyRateDAO
from app.config import settings


def validate_currency_type(currency_type: str) -> str:
    """Проверяет корректность типа валюты."""
    if currency_type.lower() not in settings.VALID_CURRENCIES:
        raise HTTPException(status_code=400, detail=settings.ERROR_MESSAGES["currency_type"])
    return currency_type.lower()


def validate_range(min_val: float, max_val: float) -> None:
    """Проверяет корректность диапазона значений."""
    if min_val > max_val:
        raise HTTPException(status_code=400, detail=settings.ERROR_MESSAGES["range"])


async def get_currency_ranges(
        currency_type: str,
        operation: str,
        session: AsyncSession
) -> Tuple[Tuple[float, float], Tuple[float, float]]:
    """Получает диапазоны для основной и альтернативной валюты."""
    other_currency = 'eur' if currency_type == 'usd' else 'usd'

    requested_range = await CurrencyRateDAO.get_currency_range(
        currency=currency_type,
        operation=operation,
        session=session
    )
    other_range = await CurrencyRateDAO.get_currency_range(
        currency=other_currency,
        operation=operation,
        session=session
    )

    return requested_range, other_range

Краткое описание функций

  1. validate_currency_type(currency_type: str) -> str:

    • Проверяет, что указанный тип валюты (например, "USD" или "EUR") допустим.

    • Если тип валюты недопустим, вызывает исключение HTTPException с кодом 400 и сообщением об ошибке.

    • Возвращает тип валюты в нижнем регистре.

  2. validate_range(min_val: float, max_val: float) -> None:

    • Проверяет корректность диапазона (например, минимальное значение не должно быть больше максимального).

    • Если диапазон некорректен, вызывает исключение HTTPException с кодом 400.

  3. get_currency_ranges(currency_type: str, operation: str, session: AsyncSession) -> Tuple[Tuple[float, float], Tuple[float, float]]:

    • Асинхронно запрашивает диапазоны курсов для двух валют:

      • Основная валюта (заданная currency_type).

      • Альтернативная валюта (определяется как "USD", если основная — "EUR", и наоборот).

    • Использует DAO (Data Access Object) для получения данных из базы через сессию AsyncSession.

    • Возвращает два диапазона в виде кортежей чисел.

Когда мы включим этот код в наши API-методы, станет понятнее, зачем они вынесены в отдельный файл.

Назначение роутера в файле app/api/router.py

Определяем роутер:

router = APIRouter(prefix='/api', tags=['API'])

Первый эндпоинт: получение всех курсов валют

@router.get("/all_currency/")
async def get_all_currency(
        user_data: User = Depends(get_current_user),
        session: AsyncSession = SessionDep
) -> List[CurrencyRateSchema]:
    """Возвращает актуальные курсы валют всех банков."""
    return await CurrencyRateDAO.find_all(session=session, filters=None)

Разбор эндпоинта

  1. Путь: GET-запрос на /api/all_currency (префикс /api добавлен через APIRouter).

  2. Зависимости:

    • get_current_user: Проверяет авторизацию пользователя.

    • SessionDep: Генерирует асинхронную сессию для работы с базой данных.

  3. Логика:

    • Используется универсальный метод find_all из BaseDao для получения всех записей без фильтров.

    • Сессия передается первым аргументом для обеспечения корректной работы.

Зависимости в FastAPI

- get_current_user

Эта зависимость у вас уже есть, если вы использовали мой шаблон. Сама она из микросервиса auth и она выполняет проверки:

  • Есть ли в cookies JWT-токен.

  • Не истек ли срок действия токена.

  • Удается ли получить информацию о пользователе.

SessionDep

Генератор сессий, который автоматически управляет жизненным циклом соединений с базой данных. Зависимость выполняется до основной логики эндпоинта. Тоже у вас есть если работаете с моим шаблоном.

Совет: Всегда явно передавайте сессию как именованный аргумент, чтобы избежать потенциальных проблем и повысить читаемость кода.

Эндпоинт: курсы валют конкретного банка

@router.get("/currency_by_bank/{bank_en}")
async def get_currency_by_bank(
        bank_en: str,
        user_data: User = Depends(get_current_user),
        session: AsyncSession = SessionDep
) -> CurrencyRateSchema | None:
    """Возвращает курсы валют конкретного банка по его английскому названию."""
    currencies = await CurrencyRateDAO.find_one_or_none(session=session, filters=BankNameSchema(bank_en=bank_en))
    if not currencies:
        raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["bank_not_found"])
    return currencies

Разбор эндпоинта

  1. Путь: GET-запрос на /api/currency_by_bank/{bank_en}.

  2. Логика:

    • Метод принимает параметр bank_en — английское название банка.

    • Используется метод find_one_or_none из BaseDao для поиска записи.

    • Если банк не найден, вызывается исключение HTTPException с кодом 404, используя сообщение из settings.ERROR_MESSAGES.

Эндпоинт: расширенная информация о курсах валют (для администраторов)

@router.get("/all_currency_admin/")
async def get_all_currency_admin(
        user_data: User = Depends(get_current_admin_user),
        session: AsyncSession = SessionDep
) -> List[AdminCurrencySchema]:
    """Возвращает расширенную информацию о курсах валют (только для админов)."""
    return await CurrencyRateDAO.find_all(session=session, filters=None)

Новая схема Pydantic

Для работы этого эндпоинта создадим схему:

class AdminCurrencySchema(CurrencyRateSchema):
    id: int
    created_at: datetime
    updated_at: datetime

Эта схема наследуется от CurrencyRateSchema и добавляет три поля: id, created_at и updated_at.

В новом эндпоинте используется зависимость get_current_admin_user:

async def get_current_admin_user(current_user: User = Depends(get_current_user)):
    if current_user.role.id in [3, 4]:
        return current_user
    raise ForbiddenException

Эта зависимость проверяет, что пользователь имеет административную роль. Она расширяет функциональность get_current_user, добавляя дополнительную проверку.

Дополнение для взаимодействия с базой данных

Для создания новых эндпоинтов (API-методов) нужно написать дополнительные методы в CurrencyRateDAO, которые используют SQLAlchemy.

Примечание: Для понимания работы с CurrencyRateDAO рекомендуется иметь базовые знания SQLAlchemy. Если нужно освежить знания, можно обратиться к моим статьям на Хабре, где я подробно разбираю основы SQLAlchemy 2.0.

Пишем новые DAO методы

Сейчас я продемонстрирую полный код класса CurrencyRateDAO со всеми новыми методами и после коротко разберу каждый метод, чтоб не занимать много вашего и своего времени. В целом, должно быть все понятно.

from pydantic import BaseModel
from sqlalchemy import select, update, desc
from sqlalchemy.exc import SQLAlchemyError
from typing import List, Optional, Tuple, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.models import CurrencyRate
from app.api.schemas import CurrencyRateSchema, BestRateResponse
from app.config import settings
from app.dao.base import BaseDAO
from loguru import logger


class CurrencyRateDAO(BaseDAO):
    model = CurrencyRate

    @classmethod
    async def _get_value_range(cls, session: AsyncSession, field: str) -> Tuple[float, float]:
        """Получает минимальное и максимальное значение для указанного поля."""
        try:
            result = await session.execute(select(getattr(cls.model, field)))
            values = result.scalars().all()
            return (min(values), max(values)) if values else (0.0, 0.0)
        except SQLAlchemyError as e:
            logger.error(f"Ошибка при получении диапазона для {field}: {e}")
            raise

    @classmethod
    async def _find_by_range(
            cls,
            field_name: str,
            min_value: float,
            max_value: float,
            session: AsyncSession
    ) -> List[CurrencyRateSchema]:
        """Поиск валютных курсов по диапазону для указанного поля."""
        try:
            query = select(cls.model).filter(
                getattr(cls.model, field_name).between(min_value, max_value)
            )
            result = await session.execute(query)
            records = result.scalars().all()
            return [CurrencyRateSchema.from_orm(record) for record in records]
        except SQLAlchemyError as e:
            logger.error(f"Ошибка при поиске по диапазону {field_name}: {e}")
            raise

    @classmethod
    async def find_by_range_multi(
            cls,
            ranges: Dict[str, Tuple[float, float]],
            session: AsyncSession
    ) -> List[CurrencyRateSchema]:
        """Поиск валютных курсов по нескольким диапазонам."""
        results = []
        for field, (min_val, max_val) in ranges.items():
            results.extend(await cls._find_by_range(field, min_val, max_val, session))
        return results

    @classmethod
    async def find_by_purchase_range(
            cls,
            usd_buy_min: float,
            usd_buy_max: float,
            eur_buy_min: float,
            eur_buy_max: float,
            session: AsyncSession
    ) -> List[CurrencyRateSchema]:
        """Поиск курсов по диапазону покупки USD/EUR."""
        ranges = {
            'usd_buy': (usd_buy_min, usd_buy_max),
            'eur_buy': (eur_buy_min, eur_buy_max)
        }
        return await cls.find_by_range_multi(ranges, session)

    @classmethod
    async def find_by_sale_range(
            cls,
            usd_sell_min: float,
            usd_sell_max: float,
            eur_sell_min: float,
            eur_sell_max: float,
            session: AsyncSession
    ) -> List[CurrencyRateSchema]:
        """Поиск курсов по диапазону продажи USD/EUR."""
        ranges = {
            'usd_sell': (usd_sell_min, usd_sell_max),
            'eur_sell': (eur_sell_min, eur_sell_max)
        }
        return await cls.find_by_range_multi(ranges, session)

    @classmethod
    async def get_currency_range(cls, currency: str, operation: str, session: AsyncSession) -> Tuple[float, float]:
        """Получает диапазон цен для указанной валюты и операции."""
        field = settings.CURRENCY_FIELDS[currency][operation]
        return await cls._get_value_range(session, field)

    @classmethod
    async def bulk_update_currency(cls, session: AsyncSession, records: List[BaseModel]) -> int:
        """Массовое обновление валютных курсов."""
        разбирали ранее

    @classmethod
    async def _find_best_rate(
            cls,
            currency_type: str,
            operation: str,
            session: AsyncSession
    ) -> Optional[BestRateResponse]:
        """Находит лучший курс для указанной валюты и операции."""
        try:
            field = settings.CURRENCY_FIELDS[currency_type][operation]
            order_by = desc(field) if operation == 'sell' else field

            query = select(cls.model).order_by(order_by)
            result = await session.execute(query)
            rates = result.scalars().all()

            if not rates:
                return None

            best_value = getattr(rates[0], field)
            best_banks = [
                bank.bank_name for bank in rates
                if getattr(bank, field) == best_value
            ]

            return BestRateResponse(rate=best_value, banks=best_banks)
        except SQLAlchemyError as e:
            logger.error(f"Ошибка поиска лучшего курса: {e}")
            raise

    @classmethod
    async def find_best_purchase_rate(cls, currency_type: str, session: AsyncSession) -> Optional[BestRateResponse]:
        """Находит лучший курс покупки для указанной валюты."""
        return await cls._find_best_rate(currency_type, 'buy', session)

    @classmethod
    async def find_best_sale_rate(cls, currency_type: str, session: AsyncSession) -> Optional[BestRateResponse]:
        """Находит лучший курс продажи для указанной валюты."""
        return await cls._find_best_rate(currency_type, 'sell', session)

Разбор новых методов

1. _get_value_range

Что делает?
Метод ищет минимальное и максимальное значение для указанного поля (например, usd_buy или eur_sell) в таблице.

Как работает?

  • Выполняет запрос для получения всех значений из базы данных по указанному полю.

  • Если данные есть, возвращает минимальное и максимальное значения.

  • Если данных нет, возвращает (0.0, 0.0).

Пример:
Если мы ищем минимальный и максимальный курс покупки доллара, метод вернёт, например, (74.5, 75.8).

2. _find_by_range

Что делает?
Ищет записи в базе данных, где значения указанного поля находятся в заданном диапазоне (от min_value до max_value).

Как работает?

  • Фильтрует записи таблицы с помощью метода between (значение "между").

  • Преобразует найденные записи в формат, описанный в схеме CurrencyRateSchema.

Пример:
Если задать диапазон для usd_buy от 74.0 до 75.0, метод найдёт все записи с курсами покупки доллара в этом диапазоне.

3. find_by_range_multi

Что делает?
Ищет записи, попадающие в несколько диапазонов одновременно, для разных полей.

Как работает?

  • Проходит по всем переданным диапазонам (например, для usd_buy и eur_sell).

  • Использует метод _find_by_range для поиска по каждому полю.

  • Собирает результаты в общий список.

Пример:
Можно передать диапазоны для usd_buy (74.0–75.0) и eur_sell (84.0–85.0). Метод вернёт записи, удовлетворяющие хотя бы одному из условий.

4. find_by_purchase_range

Что делает?
Ищет курсы покупки (доллара и евро), попадающие в заданные диапазоны.

Как работает?

  • Принимает минимальные и максимальные значения для usd_buy и eur_buy.

  • Формирует диапазоны и вызывает метод find_by_range_multi.

Пример:
Задайте диапазон для курса покупки доллара (74.0–75.0) и евро (84.0–85.0), чтобы получить записи с подходящими курсами.

5. find_by_sale_range

Что делает?
Похож на метод find_by_purchase_range, но работает с диапазонами для курсов продажи (usd_sell и eur_sell).

6. get_currency_range

Что делает?
Определяет диапазон значений для конкретной валюты (usd или eur) и операции (buy или sell).

Как работает?

  • Использует поле из настроек приложения (settings.CURRENCY_FIELDS), соответствующее валюте и операции.

  • Вызывает метод _get_value_range для поиска минимума и максимума по этому полю.

Пример:
Для курса покупки доллара вызов метода вернёт диапазон, например, (74.5, 75.8).

7. _find_best_rate

Что делает?
Ищет лучший курс (минимальный для покупки или максимальный для продажи) для указанной валюты.

Как работает?

  • Выбирает поле, связанное с валютой и операцией.

  • Сортирует записи по этому полю:

    • По убыванию (для продажи).

    • По возрастанию (для покупки).

  • Возвращает объект с лучшим курсом и списком банков, предлагающих этот курс.

Пример:
Для курса продажи евро может вернуть:
{'rate': 85.5, 'banks': ['Bank1', 'Bank2']}.

8. find_best_purchase_rate

Что делает?
Находит лучший курс покупки для указанной валюты.

Как работает?

  • Вызывает метод _find_best_rate, передавая тип валюты (usd или eur) и операцию buy.

9. find_best_sale_rate

Что делает?
Находит лучший курс продажи для указанной валюты.

Как работает?
Похож на find_best_purchase_rate, но передаёт операцию sell.

Надеюсь, что с этим все понятно. Если что-то осталось непонятным, задавайте вопросы в комментариях к статье или присоединяйтесь к нашему телеграм-сообществу «Легкий путь в Python» (там уже более 1400 участников).

Создание новых API-методов

На основе новых методов SQLAlchemy создадим API-методы для взаимодействия с данными.

Метод: Получение курсов валют в диапазоне цен покупки для USD и EUR

@router.post("/currency_in_purchase_range/")
async def get_currency_in_purchase_range(
        filter_data: CurrencyRangeFilterSchema,
        user_data: User = Depends(get_current_user),
        session: AsyncSession = SessionDep
) -> List[CurrencyRateSchema]:
    """
    Возвращает курсы валют, находящиеся в заданном диапазоне цен покупки для USD и EUR.
    """
    validate_range(filter_data.usd_min, filter_data.usd_max)
    validate_range(filter_data.eur_min, filter_data.eur_max)

    currencies = await CurrencyRateDAO.find_by_purchase_range(
        usd_buy_min=filter_data.usd_min,
        usd_buy_max=filter_data.usd_max,
        eur_buy_min=filter_data.eur_min,
        eur_buy_max=filter_data.eur_max,
        session=session
    )

    if not currencies:
        raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["not_found"])
    return currencies

Хотя мы получаем данные, реализация выполнена через POST-запрос для удобства тестирования. В этом методе используются две схемы Pydantic:

  1. CurrencyRangeFilterSchema — проверяет входные данные:

    class CurrencyRangeFilterSchema(BaseModel):
        usd_min: float | None = 0
        usd_max: float | None = 0
        eur_min: float | None = 0
        eur_max: float | None = 0
    
  2. CurrencyRateSchema — проверяет корректность возвращаемых данных (обсуждалось ранее).

После валидации входных данных вызывается метод find_by_purchase_range, реализованный ранее.

Метод: Получение курсов валют в диапазоне цен продажи для USD и EUR

@router.post("/currency_in_sale_range/")
async def get_currency_in_sale_range(
        filter_data: CurrencySaleRangeFilterSchema,
        user_data: User = Depends(get_current_user),
        session: AsyncSession = SessionDep
) -> List[CurrencyRateSchema]:
    """
    Возвращает курсы валют, находящиеся в заданном диапазоне цен продажи для USD и EUR.
    """
    validate_range(filter_data.usd_sale_min, filter_data.usd_sale_max)
    validate_range(filter_data.eur_sale_min, filter_data.eur_sale_max)

    currencies = await CurrencyRateDAO.find_by_sale_range(
        usd_sell_min=filter_data.usd_sale_min,
        usd_sell_max=filter_data.usd_sale_max,
        eur_sell_min=filter_data.eur_sale_min,
        eur_sell_max=filter_data.eur_sale_max,
        session=session
    )

    if not currencies:
        raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["not_found"])
    return currencies

Используется схема CurrencySaleRangeFilterSchema для валидации входных данных:

class CurrencySaleRangeFilterSchema(BaseModel):
    usd_sale_min: float | None = 0
    usd_sale_max: float | None = 0
    eur_sale_min: float | None = 0
    eur_sale_max: float | None = 0

Остальная логика идентична предыдущему методу.

Метод: Получение информации о банках с лучшим курсом покупки для выбранной валюты

@router.get("/best_purchase_rate/{currency_type}")
async def get_best_purchase_rate(
        currency_type: str,
        user_data: User = Depends(get_current_user),
        session: AsyncSession = SessionDep
) -> BestRateResponse:
    """
    Возвращает информацию о банках с лучшим курсом покупки для выбранной валюты.
    """
    currency_type = validate_currency_type(currency_type)
    result = await CurrencyRateDAO.find_best_purchase_rate(currency_type=currency_type, session=session)

    if not result or not result.banks:
        raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["not_found"])
    return result

На вход подается строка, представляющая валюту (например, "usd" или "eur"). Это позволяет избежать избыточности в реализации.

Метод: Получение информации о банках с лучшим курсом продажи для выбранной валюты

@router.get("/best_sale_rate/{currency_type}")
async def get_best_sale_rate(
        currency_type: str,
        user_data: User = Depends(get_current_user),
        session: AsyncSession = SessionDep
) -> BestRateResponse:
    """
    Возвращает информацию о банках с лучшим курсом продажи для выбранной валюты.
    """
    currency_type = validate_currency_type(currency_type)
    result = await CurrencyRateDAO.find_best_sale_rate(currency_type=currency_type, session=session)

    if not result or not result.banks:
        raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["not_found"])
    return result

Метод идентичен предыдущему, за исключением используемого метода DAO.

Метод: Получение минимальных и максимальных цен покупки для обеих валют

@router.get("/currency_purchase_range/{currency_type}")
async def get_currency_purchase_range(
        currency_type: str,
        user_data: User = Depends(get_current_user),
        session: AsyncSession = SessionDep
) -> CurrencyRangeFilterSchema:
    """
    Возвращает минимальные и максимальные цены покупки для обеих валют.
    """
    currency_type = validate_currency_type(currency_type)
    requested_range, other_range = await get_currency_ranges(currency_type, 'buy', session)

    return CurrencyRangeFilterSchema(
        usd_min=requested_range[0] if currency_type == 'usd' else other_range[0],
        usd_max=requested_range[1] if currency_type == 'usd' else other_range[1],
        eur_min=other_range[0] if currency_type == 'usd' else requested_range[0],
        eur_max=other_range[1] if currency_type == 'usd' else requested_range[1]
    )

Эндпоинт принимает тип валюты (usd или eur) и возвращает минимальные и максимальные цены покупки для обеих валют.

Метод: Получение минимальных и максимальных цен продажи для обеих валют

@router.get("/currency_sale_range/{currency_type}")
async def get_currency_sale_range(
        currency_type: str,
        user_data: User = Depends(get_current_user),
        session: AsyncSession = SessionDep
) -> CurrencySaleRangeFilterSchema:
    """
    Возвращает минимальные и максимальные цены продажи для обеих валют.
    """
    currency_type = validate_currency_type(currency_type)
    requested_range, other_range = await get_currency_ranges(currency_type, 'sell', session)

    return CurrencySaleRangeFilterSchema(
        usd_sale_min=requested_range[0] if currency_type == 'usd' else other_range[0],
        usd_sale_max=requested_range[1] if currency_type == 'usd' else other_range[1],
        eur_sale_min=other_range[0] if currency_type == 'usd' else requested_range[0],
        eur_sale_max=other_range[1] if currency_type == 'usd' else requested_range[1]
    )

Логика похожая и большого внимания не заслуживает.

Теперь я предлагаю вам сделать следующее: перезапустите сервис и попробуйте использовать созданные методы API в автоматически созданной документации FastApi. Помните, что все методы будут требовать либо прав администратора, либо простой авторизации в системе.

Если все тесты прошли успешно, то следующим шагом станет размещение нашего приложения FastApi на сервисе Amvera Cloud.

После того как мы завершим этот процесс, у нас будет возможность интегрировать наш API в различные системы, такие как сайты, мобильные приложения, телеграм-боты и другие, а также проводить тестирование в реальных условиях, отправляя асинхронные запросы через aiohttp.

Деплой API на Amvera Cloud

Я упоминал, что деплой займет всего 5 минут. Сейчас докажу это на практике.

Шаг 1: Создание файла конфигурации amvera.yml

На одном уровне с файлами .env и requirements.txt создайте файл amvera.yml. Этот файл содержит настройки, которые помогут Amvera Cloud собрать и запустить ваш проект. Пример настроек:

meta:
  environment: python
  toolchain:
    name: pip
    version: 3.12
build:
  requirementsPath: requirements.txt
run:
  persistenceMount: /data
  containerPort: 8000
  command: uvicorn app.main:app --host 0.0.0.0 --port 8000

Описание настроек:

  • meta: Определяет среду выполнения (Python) и инструмент для управления зависимостями (pip).

  • build: Указывает путь к файлу requirements.txt для установки зависимостей.

  • run: Настраивает путь для сохранения данных (папка /data), порт контейнера (8000), и команду запуска приложения через uvicorn.

Шаг 2: Регистрация и создание проекта на Amvera Cloud

  1. Регистрация: Зайдите на сайт Amvera Cloud и зарегистрируйтесь, если у вас еще нет аккаунта. Новые пользователи получают 111 рублей на основной баланс.

  2. Создание проекта: Нажмите кнопку «Создать проект».

  3. Настройка проекта:

    • Укажите имя проекта.

    • Выберите тарифный план. Для текущего проекта подойдет тариф «Начальный». Нажмите «Далее».

  4. Загрузка файлов:

    • На следующем шаге выберите способ доставки файлов: через GIT или загрузку вручную.

    • Для простоты используйте интерфейс загрузки: перетащите все файлы проекта «как есть». Убедитесь, что добавлены файлы amvera.yml и requirements.txt. Нажмите «Далее».

  5. Проверка конфигурации: На следующем экране проверьте корректность настроек и подтвердите их.

Шаг 3: Настройка домена

  1. Войдите в созданный проект и перейдите на вкладку «Настройки».

  2. Нажмите «Добавить домен»:

    • Выберите HTTPS.

    • Укажите бесплатный домен Amvera или добавьте собственный домен (например, с REG.RU), если у вас уже есть зарегистрированный домен.

Шаг 4: Ожидание деплоя

Через 2–3 минуты проект будет развернут. Вы увидите что-то похожее на это:

Пример ссылки на документацию проекта:

Перейдите по выделенной ссылке, чтобы убедиться, что всё работает корректно.

Если всё сделано правильно, API готов к использованию в боевом режиме!

Тестируем API

Вы, вероятно, ожидаете увидеть здесь Pytest, но я решил не усложнять и написать несколько асинхронных функций на Aiohttp для тестирования нашего API. Такой подход будет понятен как новичкам, так и опытным Python-разработчикам.

Для этого в корне проекта я создам файл api_test.py.

Сначала выполним необходимые импорты и назначим переменные, которые позволят нам работать с запросами:

import aiohttp
from loguru import logger
from asyncio import run

BASE_SITE = 'https://bankiru-yakvenalex.amvera.io/'
TAG_AUTH = 'auth/'
TAG_API = 'api/'

headers = {
    "accept": "application/json",
    "Content-Type": "application/json",
}

Через aiohttp мы будем выполнять асинхронные запросы к нашему API, через logger будем выводить информацию после запросов в консоль, а при помощи run будем запускать асинхронный код.

Теперь напишем первую функцию для регистрации пользователя в системе:

async def register_user(
    email: str,
    phone_number: str,
    first_name: str,
    last_name: str,
    password: str,
    confirm_password: str
):
    """Выполняет POST-запрос на регистрацию пользователя."""
    url = f"{BASE_SITE}/{TAG_AUTH}/register/"
    payload = {
        "email": email,
        "phone_number": phone_number,
        "first_name": first_name,
        "last_name": last_name,
        "password": password,
        "confirm_password": confirm_password,
    }
    
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(url, headers=headers, json=payload) as response:
                response_data = await response.json()
                logger.info(response_data)
                return response_data
        except aiohttp.ClientError as e:
            logger.error(f"Ошибка запроса: {e}")
            return None

Проверим:

run(register_user(
    email="alex@rambler.ru",
    phone_number="+79001234567",
    first_name="Alex",
    last_name="Ivanov",
    password="securePassword123",
    confirm_password="securePassword123"
))

Теперь напишем функцию для авторизации в системе:

async def login_user(email: str, password: str):
    """Выполняет POST-запрос для авторизации пользователя."""
    url = f"{BASE_SITE}/{TAG_AUTH}/login/"
    payload = {
        "email": email,
        "password": password,
    }
    
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(url, headers=headers, json=payload) as response:
                response_data = await response.json()
                if response_data:
                    if response_data.get("ok"):
                        logger.success(f"Access token: {response_data['access_token']}")
                    else:
                        logger.warning(f"Ошибка входа: {response_data.get('message')}")
                return response_data
        except aiohttp.ClientError as e:
            logger.error(f"Ошибка запроса: {e}")
            return None

Выполним с некорректными данными:

run(login_user(
    email="alex@rambler.ru",
    password="securePassword1232"
))

Ответ:

2024-11-20 12:13:22.114 | WARNING | __main__:login_user:66 - Ошибка входа

Теперь с корректными данными. Ответ:

2024-11-20 12:13:58.531 | SUCCESS | __main__:login_user:64 - Access token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI3IiwiZXhwIjoxNzM0Njg2MDM4fQ.Tl_wF1cnTDxLslNkk5VJm_3-2xLcKUEIUX_odldKims

Давайте сохраним этот токен в переменную USER_TOKEN. Теперь предлагаю выполнить вход с учетной записи, где есть права админа, и сохранить значение токена в переменную ADMIN_TOKEN. Эти две переменные нам будут нужны на этапе тестирования методов, требующих авторизацию в системе.

Теперь напишем метод для получения информации по валютам от всех банков. Сейчас покажу вам, как можно использовать JWT-токен извне, передавая его в куки. По этой технологии работает большинство современных API (Bearer):

async def get_all_currency(access_token: str, method_name='all_currency'):
    """Выполняет запрос для получения всех валют."""
    url = f"{BASE_SITE}/{TAG_API}/{method_name}/"
    headers['cookie'] = f"users_access_token={access_token}"
    
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, headers=headers) as response:
                response_data = await response.json()
                logger.info(f"Статус: {response.status}")
                return response_data
        except aiohttp.ClientError as e:
            logger.error(f"HTTP request failed: {e}")
            return None

Данная функция принимает на вход токен пользователя и имя метода. Аргумент с именем метода я ввел по причине того, что получение информации по всем курсам банков и для пользователя, и для админа работает по одним правилам. Меняется только имя метода и токен.

Тут мы в словарь headers добавляем ключ cookie, а в его значение подставляем f"users_access_token={access_token}".

При проблемах с доступом (например, некорректный токен) система выдаст ошибку 401.

Давайте выполним запрос с корректным токеном, а полученный результат уже прогоним через цикл for:

rez = run(get_all_currency(access_token=USER_TOKEN))
for i in rez:
    logger.info(i)

Таким образом, нам удалось получить актуальные данные с нашего API, который сейчас размещен на серверах Amvera.

Теперь попробуем выполнить запрос для получения информации от админа, но с токеном пользователя:

rez = run(get_all_currency(access_token=USER_TOKEN, method_name='all_currency_admin'))

Тут я получаю сообщение "нет прав":

{'detail': 'Недостаточно прав!'}

При замене на ADMIN_TOKEN получаю нужную информацию.

По остальным методам не вижу смысла писать функции, так как общая логика должна быть понятна.

Заключение

В этой статье мы создали простой, но функциональный API с набором универсальных методов. Моя цель состояла не в том, чтобы разработать сложный API с уникальными функциями, а в том, чтобы помочь вам закрепить практические навыки работы с FastAPI, полученные в теоретических статьях.

Что мы изучили

  • Интеграцию APScheduler в проекты на FastAPI — инструмент, который часто применяется в реальных проектах

  • Практическое применение теоретических знаний FastAPI

  • Базовые принципы построения API

Что дальше?

В более сложных проектах вместо APScheduler обычно применяется Celery. Если будет достаточно интереса и отклика, в следующих статьях я планирую обсудить технологию Celery и Redis в контексте проектов FastApi.

Полезные ресурсы

Если вы хотите получать больше эксклюзивного контента, приглашаю вас в мой бесплатный Telegram-канал «Легкий путь в Python», где уже более 1400 единомышленников!

Если статья оказалась полезной, буду признателен за лайк и комментарий.

До новых встреч!

Комментарии (9)


  1. pastop
    21.11.2024 05:34

    Добрый день, спасибо за подробный мануал, подскажите пожалуйста, как будет работать APScheduler при использовании uvicorn --workers 4 (как пример), у меня запускалось 4 экземпляра и победить это не удалось, пришлось вывести в отдельный сервис


    1. yakvenalex Автор
      21.11.2024 05:34

      В случае с несколькими воркерами задача становится немного сложнее, поскольку каждый из них запускает свой собственный экземпляр приложения. Здесь можно рассмотреть два варианта решения проблемы:

      1. Инициализация планировщика только в одном воркере: Чтобы избежать дублирования задач, можно использовать проверку на наличие существующего экземпляра планировщика. Например, можно использовать переменную окружения или файл блокировки.

      2. Использование Redis или другой базы данных для хранения состояния задач: Это позволит вам управлять задачами более централизованно и избежать их дублирования.

      Для более сложных проектов обычно используется Celery.


      1. pastop
        21.11.2024 05:34

        Очень бы хотелось посмотреть на работу брокера, может быть даже с использованием FastStream, благо что автор присутствует на хабре, рассмотрите вариант?


    1. diman40
      21.11.2024 05:34

      Как вариант: не делать таких гибридов а разделять на FastAPI Application и scheduler. Второй сервис запускать в одном экземпляре.


      1. pastop
        21.11.2024 05:34

        Так и сделал, и даже подобие управления есть т.к. БД для FastAPI и APScheduler общая, но не красиво же :)


        1. diman40
          21.11.2024 05:34

          Можно снизить градус отторжения того, что одну БД разделяют два сервиса (но, по сути, они родные братья): scheduler лезет при необходимости в БД только readonly, а если меняет что то в общих с app данных, то делает это только через api-вызов к app.


          1. pastop
            21.11.2024 05:34

            с APScheduler вроде так не получится, он сам в БД пишет id задачи и время запуска, но в общем это не критично, работает и слава богу.


            1. yakvenalex Автор
              21.11.2024 05:34

              Почему нельзя? APScheduler запускает отдельные функции. Если вы напишите функцию на основе того-же aiohttp, то свободно сможете заходить в базу через API-методы. Вполне себе рабочая схема)


              1. pastop
                21.11.2024 05:34

                Возможно я не корректно выразился, но APScheduler  для своей работы может использовать БД (в моем случае Postgres) и он сам записывает id задания и время исполнения в свою таблицу, т.о. для APScheduler нельзя выставить RO, он работать не сможет.
                В общем то оно не критично, по крайней мере в моем частном случае, и живет как отдельный сервис, хотя для меня не не самое "красивое" решение.