Друзья, приветствую! К сожалению, не хватает времени, чтобы чаще публиковаться, и надеюсь, что вы ждали этой статьи.
Как вы поняли из названия, сегодня мы поговорим про авторизацию и аутентификацию. Прежде чем вы приступите к прочтению статьи, настоятельно рекомендую вам ознакомиться с прошлым материалом:
Создание собственного API на Python (FastAPI): Знакомство и первые функции (часть 1)
Создание собственного API на Python (FastAPI): Гайд по POST, PUT, DELETE запросам и моделям Pydantic (часть 2)
Без знаний, описанных там, вам вряд ли удастся разобраться, что к чему. Прежде чем мы приступим к написанию кода, давайте немного теории.
Изначально я планировал рассказать вам о работе с готовыми модулями для авторизации, но в процессе решил, что вам будет интереснее написать свою собственную авторизацию со своими полями, ролями и прочее.
Аутентификация
Аутентификация — это процесс проверки подлинности пользователя. Он подтверждает, что пользователь действительно тот, за кого себя выдает.
Когда вы вводите логин и пароль в форму авторизации, на самом деле происходит процесс аутентификации. Система проверяет, существует ли пользователь, корректный ли его пароль, и только если все условия выполнены, происходит аутентификация.
Авторизация
Авторизация — это процесс предоставления пользователю прав доступа к определенным ресурсам или действиям. Она определяет, что пользователь может и не может делать в системе.
Авторизация может быть и без аутентификации. К примеру, любая свободная доска объявлений: вы, не выполнив аутентификацию, можете читать объявления, а в некоторых случаях забирать контактные данные.
На том же примере: иногда доступ к контактам будет открыт только после входа в систему (после аутентификации), как и размещение объявлений.
Более серьезные темы работы с авторизацией включают доступ к админ-панели. В зависимости от вашего уровня доступа вам будет открыт тот или иной функционал или предоставлена определенная скрытая информация.
Для реализации обычно пользователям присваиваются определенные роли, и в зависимости от роли каждому предоставляются те или иные права. Кто-то может всё, например, админ, а кому-то можно только оставить комментарии (авторизованный пользователь).
Сегодня мы создадим такие роли и я покажу вам, как открывать (давать доступ) к данным или функционалу в зависимости от роли пользователя.
Связанные понятия
Вместе с этими темами в FastAPI тесно связаны следующие понятия:
Dependencies
Dependencies (зависимости) — это механизм FastAPI, который позволяет легко внедрять зависимости в маршруты. Это могут быть как простые значения, так и сложные объекты, такие как базы данных или другие сервисы.
Объясню проще: например, такой зависимостью может быть результат выполнения той или иной функции. К примеру, это проверка, авторизован ли пользователь. Тогда мы передадим аргументом Dependencies вместе с функцией, которая должна выполниться ранее. И только после успешного её выполнения откроется доступ к новой функции.
Примечательно, что зависимостей может быть несколько, и сегодня, на конкретных примерах, вы узнаете, как работать с этим механизмом.
Вместо долгих теоретических объяснений, мы вскоре познакомимся с зависимостями на конкретных примерах и опишем их в коде.
Hashing
Hashing — процесс преобразования данных (например, пароля) в уникальный код фиксированной длины. В FastAPI часто используется для безопасного хранения паролей.
Сегодня мы будем захватывать пароль пользователя, после чего трансформируем его в hash и запишем в базу данных уже hash-строку. Также мы напишем метод, позволяющий проверять подлинность пароля. Он будет принимать пароль и hash и проверять, соответствуют ли они друг другу.
JWT токены
JWT токены: JSON Web Token (JWT) — это компактный, URL-ориентированный способ представления информации, которую можно использовать для передачи данных между двумя сторонами. В контексте авторизации JWT часто используется для создания токенов доступа, которые могут проверяться сервером. Сегодня мы научимся генерировать такие токены, помещать их в куки и доставать с них данные.
Будет рассмотрена только тема access_token (без refresh). Причин тут 2:
Не хочу перегружать статью, а то снова получится на 30+ минут
Последняя статья вообще не зашла из чего я сделал вывод, что тема FastApi особо вам не интересна, а значит тратить несколько дней на одну статью смысла нет.
Куки
Куки — это небольшие данные, которые сервер отправляет браузеру. Каждый раз, когда браузер запрашивает страницу с того же сервера, он отправляет куку обратно. Это позволяет сохранять состояние, например, оставаться залогиненным.
Сегодня я покажу вам как в куки поместить JWT acess_token и как его оттуда достать.
Теперь, когда у нас есть общее представление о теории, давайте приступим к написанию кода.
Подготовка
Для начала создадим отдельный роутер (папку) и назовем его users. В самой папке необходимо будет добавить несколько файлов. Структура должна получиться такой:
my_fastapi_project/
├── tests/
│ └── (тут мы будем добавлять функции для Pytest)
├── app/
│ ├── database.py
│ ├── config.py
│ ├── main.py
│ └── students/
│ ├── router.py
│ ├── schemas.py
│ ├── dao.py
│ ├── rb.py
│ └── users/
│ ├── router.py
│ ├── dependencies.py
│ ├── auth.py
│ ├── schemas.py
│ ├── dao.py
│ └── dao/
│ └── base.py
│ └── migration/
│ └── (файлы миграций Alembic)
├── alembic.ini
├── .env
└── requirements.txt
То есть, новый роутер должен выглядеть так:
│ └── users/
│ ├── router.py
│ ├── dependencies.py
│ ├── auth.py
│ ├── schemas.py
│ ├── dao.py
Из нового у нас 2 файла: auth.py и dependencies.py. Для начала рассмотрим auth.py:
В этот файл нам необходимо будет прописать следующие методы:
метод для создания JWT access_token
метод для трансформации пароля в hash строку
метод для проверки соответствия пароля и hash-строки
метод который будет принимать Email (логин) и пароль от пользователя. Суть в том чтоб выполнить аутинтефикацию.
Давайте по порядку.
Необходимо установить следующие библиотеки:
python-jose
bcrypt==4.0.1
passlib[bcrypt]
Важно, чтоб версия bcrypt была именно 4.0.1
pip install python-jose bcrypt==4.0.1 passlib[bcrypt]
python-jose:
Это библиотека для работы с JSON Web Tokens (JWT). Она поддерживает различные алгоритмы шифрования и подписания для создания и проверки JWT.
Используется для генерации, подписания и верификации токенов, которые часто используются для аутентификации и передачи информации между клиентом и сервером.
bcrypt==4.0.1:
Это библиотека для хэширования паролей с использованием алгоритма bcrypt.
Важно установить именно версию 4.0.1, чтобы избежать возможных несовместимостей или изменений в API в других версиях.
bcrypt считается одним из наиболее безопасных методов хэширования паролей благодаря использованию соли и многократного хэширования.
passlib[bcrypt]:
Это универсальная библиотека для хэширования паролей, которая поддерживает различные алгоритмы хэширования, включая bcrypt.
Passlib упрощает работу с хэшированием паролей, предоставляя удобные функции для создания и проверки хэшей.
Использование passlib[bcrypt] означает, что вы устанавливаете Passlib с поддержкой алгоритма bcrypt, обеспечивая дополнительную гибкость и функциональность.
Давайте напишем логику для работы с паролями
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
Импорт библиотеки:
from passlib.context import CryptContext
Импортируется CryptContext из библиотеки Passlib, который будет использоваться для настройки и управления хэшированием паролей.
Создание контекста для хэширования паролей:
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
CryptContext настраивается для использования алгоритма bcrypt.
Параметр deprecated="auto" указывает использовать рекомендованные схемы хэширования и автоматически обновлять устаревшие.
Функция для создания хэша пароля:
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
Функция принимает пароль в виде строки и возвращает его безопасный хэш.
Функция для проверки пароля:
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
Функция принимает обычный пароль и его хэш, возвращая True, если пароль соответствует хэшу, и False в противном случае.
Думаю что в этой части все понятно. Теперь напишем функцию для генерации JWT токена.
from jose import jwt
from datetime import datetime, timedelta, timezone
from app.config import get_auth_data
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=30)
to_encode.update({"exp": expire})
auth_data = get_auth_data()
encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])
return encode_jwt
Давайте разберемся с get_auth_data. Тут мы просто должны добавить некоторые параметры в наш .env файл и после поместить эти данные в настройки.
.env
DB_HOST=localhost
DB_PORT=5433
DB_NAME=fast_api
DB_USER=amin
DB_PASSWORD=my_super_password
SECRET_KEY=gV64m9aIzFG4qpgVphvQbPQrtAO0nM-7YwwOvu0XPt5KJOjAy4AfgLkqJXYEt
ALGORITHM=HS256
Обратите внимание, что у нас появился SECRET и ALGORITHM.
После правок файл с настройками будет выглядеть так:
import os
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
DB_HOST: str
DB_PORT: int
DB_NAME: str
DB_USER: str
DB_PASSWORD: str
SECRET_KEY: str
ALGORITHM: str
model_config = SettingsConfigDict(
env_file=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".env")
)
settings = Settings()
def get_db_url():
return (
f"postgresql+asyncpg://{settings.DB_USER}:{settings.DB_PASSWORD}@"
f"{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
def get_auth_data():
return {"secret_key": settings.SECRET_KEY, "algorithm": settings.ALGORITHM}
Тут мы добавили метод, который будет возвращать эти два параметра. О том как оно все тут работает — я писал в прошлых статьях подробно. Теперь разберем сам код:
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=30)
to_encode.update({"exp": expire})
auth_data = get_auth_data()
encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])
return encode_jwt
Функция create_access_token создает JSON Web Token (JWT) для аутентификации пользователей. Она принимает словарь с данными, добавляет время истечения токена (по умолчанию 30 дней), и затем кодирует эти данные в JWT с использованием секретного ключа и алгоритма шифрования, заданных в конфигурации приложения.
Далее, мы сможем поместить данный токен в куки, после чего начать его считывать. Поместить в него можно и другие данные.
В результате считывания мы сможем понимать следующее:
Истек ли токен. Если да, то мы можем автоматически «выбить» пользователя из системы
Принимаем ID пользователя или прочие данные о нем для дальнейшей работы. Например, для предоставления пользователю того или иного функционала (данных).
В серьезных приложениях для того чтоб пользователя не выкидывало из системы реализуется функционал refresh_token. Если коротко, то refresh_token позволяет автоматически обновить срок жизни токена, тем самым, не заставляя пользователя повторно входить в систему.
Я понимаю, что сейчас не все ясно, но, совсем скоро, все станет на свои места.
Таблица пользователей.
Данные о пользователях, как и их права, нам нужно где-то хранить. Так давайте создадим соответствующую модель в файле users/models.py:
from sqlalchemy import text
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base, str_uniq, int_pk
class User(Base):
id: Mapped[int_pk]
phone_number: Mapped[str_uniq]
first_name: Mapped[str]
last_name: Mapped[str]
email: Mapped[str_uniq]
password: Mapped[str]
is_user: Mapped[bool] = mapped_column(default=True, server_default=text('true'), nullable=False)
is_student: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)
is_teacher: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)
is_admin: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)
is_super_admin: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)
extend_existing = True
def __repr__(self):
return f"{self.__class__.__name__}(id={self.id})"
В статье «Создание собственного API на Python (FastAPI): Router и асинхронные запросы в PostgreSQL (SQLAlchemy)» я подробно рассматривал тему создания таблиц (моделей) и работу с ними.
Тут мы описали нашу будущую таблицу. Из «необычного» тут есть следующие поля: is_user, is_student, is_teacher, is_admin, is_super_admin. Как вы понимаете, это ни что иное как роли пользователей.
По умолчанию мы присвоим каждому авторизованному пользователю роль is_user, но, в последствии, вы сможете изменить роль пользователю, тем самым, открывая ему доступ к новому функционалу или к особым данным.
Не забываем привязать новую таблицу в migration/env.py
from app.users.models import User
Выполняем миграцию (ревизию)
alembic revision --autogenerate -m "create users table"
Выполняем upgrade
alembic upgrade head
В результате у вас должна появиться таблица такого вида
Отлично! Теперь можем приступить к описанию самого роутера.
Нам необходим создать Router и напишем метод для регистрации пользователя.
from fastapi import APIRouter, HTTPException, status
from app.users.auth import get_password_hash
from app.users.dao import UsersDAO
from app.users.schemas import SUserRegister
router = APIRouter(prefix='/auth', tags=['Auth'])
@router.post("/register/")
async def register_user(user_data: SUserRegister) -> dict:
user = await UsersDAO.find_one_or_none(email=user_data.email)
if user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail='Пользователь уже существует'
)
user_dict = user_data.dict()
user_dict['password'] = get_password_hash(user_data.password)
await UsersDAO.add(**user_dict)
return {'message': 'Вы успешно зарегистрированы!'}
Давайте разбираться.
Сразу мы видим, что я импортировал схему SuserRegister и новый класс UsersDAO. Если вы читали прошлые статьи по теме FastApi, то знаете, что SuserRegister, это, скорее всего Pydantic модель, описывающая тело post запроса, а UsersDAO — это класс для взаимодействия с таблицей пользователей.
Схема SuserRegister:
from pydantic import BaseModel, EmailStr, Field, validator
import re
class SUserRegister(BaseModel):
email: EmailStr = Field(..., description="Электронная почта")
password: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков")
phone_number: str = Field(..., description="Номер телефона в международном формате, начинающийся с '+'")
first_name: str = Field(..., min_length=3, max_length=50, description="Имя, от 3 до 50 символов")
last_name: str = Field(..., min_length=3, max_length=50, description="Фамилия, от 3 до 50 символов")
@validator("phone_number")
@classmethod
def validate_phone_number(cls, value: str) -> str:
if not re.match(r'^\+\d{5,15}$', value):
raise ValueError('Номер телефона должен начинаться с "+" и содержать от 5 до 15 цифр')
return value
Подобное мы уже писали. В данной теме мы сообщаем, что ждем мы email, password, phone_number, first_name и last_name. Каждое из полей обязательно для заполнения.
Файл users/dao.py
from app.dao.base import BaseDAO
from app.users.models import User
class UsersDAO(BaseDAO):
model = User
Я решил не усложнять и просто наследовался от базового класса BaseDAO. То есть использовать мы будем только универсальные методы. Подробно об этом писал в прошлой статье.
Вернемся к обработчику POST запроса:
@router.post("/register/")
async def register_user(user_data: SUserRegister) -> dict:
user = await UsersDAO.find_one_or_none(email=user_data.email)
if user:
raise HTTPException(status_code=status.HTTP_409_CONFLICT,
detail='Пользователь уже существует')
user_dict = user_data.dict()
user_dict['password'] = get_password_hash(user_data.password)
await UsersDAO.add(**user_dict)
return {'message': f'Вы успешно зарегистрированы!'}
Тут мы принимаем данные от пользователя после регистрации, затем делаем проверку на то существует ли он в базе данных. Если не существует и никаких ошибок валидации нет, то мы записываем пользователя в базу дынных.
По ошибкам (исключениям) я подготовлю в своем телеграмм канале небольшой эксклюзивный гайд. Покажу как вынести исключения в отдельный файл, какие исключения бывают и как с ними проще работать. Так же я расскажу и о том, зачим импортировать status из FastApi.
Предварительно, перед записью, мы трансформируем пароль в hash-строку. Вроде бы не сильно сложно, правда?
Регистрируем роутер в файле main.py:
from fastapi import FastAPI
from app.students.router import router as router_students
from app.majors.router import router as router_majors
from app.users.router import router as router_users
app = FastAPI()
@app.get("/")
def home_page():
return {"message": "Привет, Хабр!"}
app.include_router(router_users)
app.include_router(router_students)
app.include_router(router_majors)
Давайте тестировать. Запускаем из корня проекта:
uvicorn app.main:app
Попробуем передать данные о пользователе.
{
"email": "user@example.com",
"password": "super_password",
"phone_number": "+874493831",
"first_name": "Алексей",
"last_name": "Яковенко"
}
Видим, что мы зарегистрированы. Проверим что там у нас в базе данных.
Видим что данные записаны, а пароль трансформирован в hash-строку. Неплохое начало!
Логично что следующий шаг это аутинтификация пользователя в базе данных.
И тут нам необходимо будет провернуть один трюк. Наша задача в том, чтоб после успешного входа в систему (аутинтефикация) мы сгенирировали JWT токен и поместили его в куки.
Для удобства давайте в файле auth.py напишем функцию, которая будет принимать Email и пароль, а дальше просто будет проверять есть ли такой пользователь и с таким паролем в базе данных. Без всяких там токенов и куки.
async def authenticate_user(email: EmailStr, password: str):
user = await UsersDAO.find_one_or_none(email=email)
if not user or verify_password(plain_password=password, hashed_password=user.password) is False:
return None
return user
Тут мы попытались получить данные о пользователе по email. Затем, если пользователь с таким email получен, мы проверяем соответствует ли тот пароль что передал пользователь — hash-строке.
Если все условия выполнены — вернем данные о пользователе. Иначе вернем None.
Кроме того, давайте напишем Pydantic модель для авторизации пользователя:
class SUserAuth(BaseModel):
email: EmailStr = Field(..., description="Электронная почта")
password: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков")
Тем самым мы укажем, что в теле POST запроса мы ожидаем email и password.
Теперь можем написать и сам обработчик POST запроса:
@router.post("/login/")
async def auth_user(response: Response, user_data: SUserAuth):
check = await authenticate_user(email=user_data.email, password=user_data.password)
if check is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail='Неверная почта или пароль')
access_token = create_access_token({"sub": str(check.id)})
response.set_cookie(key="users_access_token", value=access_token, httponly=True)
return {'access_token': access_token, 'refresh_token': None}
Мы использовали тут новый параметр response.
response здесь представляет объект Response, который используется для управления HTTP-ответом, отправляемым клиенту. Он позволяет установить заголовки ответа, установить куки и так далее.
Далее мы просто выполняем проверку: получили мы пользователя или нет.
Если не получили, то вернем ошибку:
HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Неверная почта или пароль')
Тут я специально явно не указывал ошибка в почте или в пароле для безопасности и защиты от подборов. Злоумышленник не будет знать существует ли пользователь с указанным логином или нет.
Если данные о пользователе получены, то мы генерируем JWT токен, а затем записываем его в куку.
response.set_cookie(key="users_access_token", value=access_token, httponly=True)
Флаг httponly=True
, установленный при установке куки с помощью метода response.set_cookie
, указывает браузеру, что куки должны быть доступны только через HTTP или HTTPS, и не могут быть доступны скриптам JavaScript на стороне клиента.
Это повышает безопасность приложения, так как куки, содержащие чувствительные данные, такие как токены аутентификации (access_token
), не могут быть скомпрометированы через атаки XSS (межсайтовый скриптинг).
Таким образом, флаг httponly=True
помогает защитить данные пользователя от несанкционированного доступа и использования.
В целом все просто и сейчас, после авторизации, я вам покажу как будет выгядеть запись в куке.
После мы возвращаем {'access_token': access_token, 'refresh_token': None}
.
Тестируем.
Намеренно допустим ошибку
Теперь укажем данные корректно:
Видим что получили ответ. Что там по кукам?
Мы видим, что в текущих куках появился ключ users_access_token с созданным токеном в значение. Отлично, ведь это то что нам было нужно!
Далее нам необходимо написать такой механизм, который позволит нам смотреть в куки и проверять, есть ли вообще ключ users_access_token, валиден ли этот ключ и так далее и тут начинается более сложная тема.
Dependencies на практике
В начале статьи я уже говорил о том что такое dependencies (зависимости), сейчас же мы познакомимся с ними на практике, но, перед этим, давайте напишем несколько сопутствующих функций.
Напишем функцию, которая позволит достать токен:
from fastapi import Request, HTTPException, status, Depends
def get_token(request: Request):
token = request.cookies.get('users_access_token')
if not token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Token not found')
return token
Смысл данного кода в том, чтоб достать значение ключа users_access_token из куки. Дополнительных проверок пока нет. Мы или получим некоторое значение или вызовем исключение, мол токен не найден.
А теперь давайте напишем функцию, которая будет зависеть от функции get_token. Смысл зависимости тут будет сводится к тому, чтоб или выполнить функцию, если мы получим токен или сразу вызвать исключение.
Отправляю полный код:
from fastapi import Request, HTTPException, status, Depends
from jose import jwt, JWTError
from datetime import datetime, timezone
from app.config import get_auth_data
from app.exceptions import TokenExpiredException, NoJwtException, NoUserIdException, ForbiddenException
from app.users.dao import UsersDAO
def get_token(request: Request):
token = request.cookies.get('users_access_token')
if not token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Token not found')
return token
async def get_current_user(token: str = Depends(get_token)):
try:
auth_data = get_auth_data()
payload = jwt.decode(token, auth_data['secret_key'], algorithms=[auth_data['algorithm']])
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен не валидный!')
expire = payload.get('exp')
expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)
if (not expire) or (expire_time < datetime.now(timezone.utc)):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')
user_id = payload.get('sub')
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Не найден ID пользователя')
user = await UsersDAO.find_one_or_none_by_id(int(user_id))
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found')
return user
Декодер:
try:
auth_data = get_auth_data()
payload = jwt.decode(token, auth_data['secret_key'], algorithms=[auth_data['algorithm']])
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен не валидный!')
В этом месте мы создали свой декодер. Его смысл в том, чтоб получить из токена данные с которыми можно будет работать. В нашем случае это exp и sub.
Теперь проверим срок токена (истек или не истек)
expire = payload.get('exp')
expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)
if (not expire) or (expire_time < datetime.now(timezone.utc)):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')
Тут нам необходимо привести данные по дате завершения срока жизни токена в питонвский формат.
Забираем значение:
expire: str = payload.get('exp')
Трансформируем в нужный формат:
expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)
Пишем простое условие:
if (not expire) or (expire_time < datetime.now(timezone.utc)):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')
Тут смысл в том, чтоб проверить есть ли параметр срока истечения жизни токена и не является ли эта дата больше текущей. Если одно из двух условий верное, то вызывается исключение.
Далее мы проверяем есть ли параметр ID пользователя:
user_id: str = payload.get('sub')
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Не найден ID пользователя')
Далее если параметр есть, то мы стараемся получить данные о пользователе:
user = await UsersDAO.find_one_or_none_by_id(int(user_id))
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found')
return user
Важно не забыть трансформировать user_id в integer.
Вот и все сложности. Завершив проверку мы или исключение (ошибку) впоймаем или получим данные о пользователе.
Как вы понимаете, все сводится к тому, чтоб выйти из функции без исключения. Если это произошло — значит все у нас получилось.
Теперь напишем обработчик GET запроса, который будет возвращать данные о пользователе авторизованному пользователю и возвращать исключение, если пользователь не авторизован.
@router.get("/me/")
async def get_me(user_data: User = Depends(get_current_user)):
return user_data
Обратите внимание, что тут мы указали зависимость от функции get_current_user. В свою очередь get_current_user зависит от get_token. На этом простом примере вы видите как можно выстраивать многоуровневые зависимости.
И, прежде чем мы приступим к тестам, давайте напишем простой обработчик, который будет выбивать пользователя из системы. Как вы понимаете, для этого нам достаточно будет удалить JWT токен из куки.
@router.post("/logout/")
async def logout_user(response: Response):
response.delete_cookie(key="users_access_token")
return {'message': 'Пользователь успешно вышел из системы'}
Тестируем.
Выходим из системы
Пробуем получить о себе данные:
Получаем ожидаемую ошибку. Выполним повторную аутинтификацию.
Повторно пробуем получить о себе данные.
Видим что данные успешно получены.
А теперь давайте напишем метод, который будет доступен только для пользователя с ролью администратора (is_admin = True). Для этого в файле dependensies.py напишем новую функцию.
async def get_current_admin_user(current_user: User = Depends(get_current_user)):
if current_user.is_admin:
return current_user
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Недостаточно прав!')
И теперь, если вы внимательно читали, то вы понимаете общую логику работы данного метода.
Напоминаю, что функция get_current_user, при успешном выполнении, возвращает данные о пользователе. В функции же get_current_admin_user мы проверяем наличие флага True в значении is_admin и только в том случае, если is_admin = True мы даем необходимые права пользователю на доступ к функционалу.
Напишем обработчик GET запроса, который вернет список из всех пользователей только администратору:
@router.get("/all_users/")
async def get_all_users(user_data: User = Depends(get_current_admin_user)):
return await UsersDAO.find_all()
Так как у меня сейчас уровень доступа is_user – я получаю следующую ошибку:
Сейчас я «руками» изменю уровень доступа, а вам советую написать обработчик POST запроса, который позволит выполнить обновление роли и доступ дайте только администратору.
Повторно выполним запрос:
Отлично, данные получены!
Проблемы и новые вызовы
Начнем с того, что не совсем правильно руками заходить в базу данных и ставить True или False под той или иной ролью.
Дальше, роли могут появляться или исчезать.
Для решения данной проблемы я специально не писал код, а хочу порекомендовать вам сделать это самостоятельно. Задача будет выглядеть так:
Создать эндпоинт для добавления / удаления ролей и сделать его доступным только админу. На входе метод пусть принимает название роли. Далее опишите DELETE и POST эндпоинты под конкретные задачи
Создать эндпоинт для изменения роли пользователя. На входе он должен принимать роль и ее новое значение. Для тех кто прям хорошо разобрался в теме, предлагаю написать специальный метод в классе UserDao который будет скидывать все флажки на False и будет устанавливать флаг True той роли, которую вы назначили пользователю. Естественно, доступ к функционалу должен быть только у администратора
В телеграмм у меня, так же, есть собществ «Легкий путь в Python — сообщество» там мы обсуждаем, в том числе, проблемы с кодингом и вместе решаем их. Присоеденяйтесь!
И есть у нас одна новая, глобальная проблема — неудобно пользоваться обычным юзерам. Сейчас поясню.
Мы с вами программисты и работа через чистое API наша стихия, но вашим продуктом должны пользоваться обычные пользователи, бизнес, а для того чтоб это было комфортно — нужен приличный фронт (визуализация апи).
Нужны формы, анимации, кнопки и прочее и, как вы уже догадались, прикручивать фронт мы будем уже в следующей статье.
Заключение
Сегодня мы рассмотрели достаточно сложную тему и, несмотря, на то что я пытался описать тут все максимально подробно — без конкретной практики усвоить данный материал будет невозможно или очень сложно. Практикуйтесь!
Далее, хочу напомнить, что полный исходник данного кода вы найдете в моем телеграмм канале «Легкий путь в Python». Там же, я выложу отдельный гайд по работе с исключениями. Я покажу в нем, как их сохранять в переменные и переиспользовать в рамках одного или нескольких проектов.
На этом пока все. До скорых встреч!
NewSouth
Проверка лишняя.
jwt.decode
уже проверяет срок жизни токена. Первыйexcept JWTError
это отловит.