Друзья, приветствую! К сожалению, не хватает времени, чтобы чаще публиковаться, и надеюсь, что вы ждали этой статьи.

Как вы поняли из названия, сегодня мы поговорим про авторизацию и аутентификацию. Прежде чем вы приступите к прочтению статьи, настоятельно рекомендую вам ознакомиться с прошлым материалом:

Без знаний, описанных там, вам вряд ли удастся разобраться, что к чему. Прежде чем мы приступим к написанию кода, давайте немного теории.

Изначально я планировал рассказать вам о работе с готовыми модулями для авторизации, но в процессе решил, что вам будет интереснее написать свою собственную авторизацию со своими полями, ролями и прочее.

Аутентификация

Аутентификация — это процесс проверки подлинности пользователя. Он подтверждает, что пользователь действительно тот, за кого себя выдает.

Когда вы вводите логин и пароль в форму авторизации, на самом деле происходит процесс аутентификации. Система проверяет, существует ли пользователь, корректный ли его пароль, и только если все условия выполнены, происходит аутентификация.

Авторизация

Авторизация — это процесс предоставления пользователю прав доступа к определенным ресурсам или действиям. Она определяет, что пользователь может и не может делать в системе.

Авторизация может быть и без аутентификации. К примеру, любая свободная доска объявлений: вы, не выполнив аутентификацию, можете читать объявления, а в некоторых случаях забирать контактные данные.

На том же примере: иногда доступ к контактам будет открыт только после входа в систему (после аутентификации), как и размещение объявлений.

Более серьезные темы работы с авторизацией включают доступ к админ-панели. В зависимости от вашего уровня доступа вам будет открыт тот или иной функционал или предоставлена определенная скрытая информация.

Для реализации обычно пользователям присваиваются определенные роли, и в зависимости от роли каждому предоставляются те или иные права. Кто-то может всё, например, админ, а кому-то можно только оставить комментарии (авторизованный пользователь).

Сегодня мы создадим такие роли и я покажу вам, как открывать (давать доступ) к данным или функционалу в зависимости от роли пользователя.

Связанные понятия

Вместе с этими темами в FastAPI тесно связаны следующие понятия:

Dependencies

Dependencies (зависимости) — это механизм FastAPI, который позволяет легко внедрять зависимости в маршруты. Это могут быть как простые значения, так и сложные объекты, такие как базы данных или другие сервисы.

Объясню проще: например, такой зависимостью может быть результат выполнения той или иной функции. К примеру, это проверка, авторизован ли пользователь. Тогда мы передадим аргументом Dependencies вместе с функцией, которая должна выполниться ранее. И только после успешного её выполнения откроется доступ к новой функции.

Примечательно, что зависимостей может быть несколько, и сегодня, на конкретных примерах, вы узнаете, как работать с этим механизмом.

Вместо долгих теоретических объяснений, мы вскоре познакомимся с зависимостями на конкретных примерах и опишем их в коде.

Hashing 

Hashing — процесс преобразования данных (например, пароля) в уникальный код фиксированной длины. В FastAPI часто используется для безопасного хранения паролей.

Сегодня мы будем захватывать пароль пользователя, после чего трансформируем его в hash и запишем в базу данных уже hash-строку. Также мы напишем метод, позволяющий проверять подлинность пароля. Он будет принимать пароль и hash и проверять, соответствуют ли они друг другу.

JWT токены

JWT токены: JSON Web Token (JWT) — это компактный, URL-ориентированный способ представления информации, которую можно использовать для передачи данных между двумя сторонами. В контексте авторизации JWT часто используется для создания токенов доступа, которые могут проверяться сервером. Сегодня мы научимся генерировать такие токены, помещать их в куки и доставать с них данные.

Будет рассмотрена только тема access_token (без refresh). Причин тут 2:

  1. Не хочу перегружать статью, а то снова получится на 30+ минут

  2. Последняя статья вообще не зашла из чего я сделал вывод, что тема FastApi особо вам не интересна, а значит тратить несколько дней на одну статью смысла нет.

Куки

Куки — это небольшие данные, которые сервер отправляет браузеру. Каждый раз, когда браузер запрашивает страницу с того же сервера, он отправляет куку обратно. Это позволяет сохранять состояние, например, оставаться залогиненным.

Сегодня я покажу вам как в куки поместить JWT acess_token и как его оттуда достать.

Теперь, когда у нас есть общее представление о теории, давайте приступим к написанию кода.

Подготовка

Для начала создадим отдельный роутер (папку) и назовем его users. В самой папке необходимо будет добавить несколько файлов. Структура должна получиться такой:

my_fastapi_project/

├── tests/
│   └── (тут мы будем добавлять функции для Pytest)
├── app/
│   ├── database.py
│   ├── config.py
│   ├── main.py
│   └── students/
│      ├── router.py
│      ├── schemas.py
│      ├── dao.py
│      ├── rb.py
│   └── users/
│      ├── router.py
│      ├── dependencies.py
│      ├── auth.py
│      ├── schemas.py
│      ├── dao.py
│   └── dao/
│       └── base.py
│   └── migration/
│       └── (файлы миграций Alembic)
├── alembic.ini
├── .env
└── requirements.txt

То есть, новый роутер должен выглядеть так:

│   └── users/
│      ├── router.py
│      ├── dependencies.py
│      ├── auth.py
│      ├── schemas.py
│      ├── dao.py

Из нового у нас 2 файла: auth.py и dependencies.py. Для начала рассмотрим  auth.py:

В этот файл нам необходимо будет прописать следующие методы: 

  • метод для создания JWT access_token

  • метод для трансформации пароля в hash строку

  • метод для проверки соответствия пароля и hash-строки

  • метод который будет принимать Email (логин) и пароль от пользователя. Суть в том чтоб выполнить аутинтефикацию.

Давайте по порядку.

Необходимо установить следующие библиотеки:

python-jose
bcrypt==4.0.1
passlib[bcrypt]

Важно, чтоб версия bcrypt была именно 4.0.1

pip install python-jose bcrypt==4.0.1 passlib[bcrypt]

python-jose:

Это библиотека для работы с JSON Web Tokens (JWT). Она поддерживает различные алгоритмы шифрования и подписания для создания и проверки JWT.

Используется для генерации, подписания и верификации токенов, которые часто используются для аутентификации и передачи информации между клиентом и сервером.

bcrypt==4.0.1:

Это библиотека для хэширования паролей с использованием алгоритма bcrypt.

Важно установить именно версию 4.0.1, чтобы избежать возможных несовместимостей или изменений в API в других версиях.

bcrypt считается одним из наиболее безопасных методов хэширования паролей благодаря использованию соли и многократного хэширования.

passlib[bcrypt]:

Это универсальная библиотека для хэширования паролей, которая поддерживает различные алгоритмы хэширования, включая bcrypt.

Passlib упрощает работу с хэшированием паролей, предоставляя удобные функции для создания и проверки хэшей.

Использование passlib[bcrypt] означает, что вы устанавливаете Passlib с поддержкой алгоритма bcrypt, обеспечивая дополнительную гибкость и функциональность.

Давайте напишем логику для работы с паролями

from passlib.context import CryptContext


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

Импорт библиотеки:

from passlib.context import CryptContext

Импортируется CryptContext из библиотеки Passlib, который будет использоваться для настройки и управления хэшированием паролей.

Создание контекста для хэширования паролей:

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

CryptContext настраивается для использования алгоритма bcrypt.

Параметр deprecated="auto" указывает использовать рекомендованные схемы хэширования и автоматически обновлять устаревшие.

Функция для создания хэша пароля:

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

Функция принимает пароль в виде строки и возвращает его безопасный хэш.

Функция для проверки пароля:

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

Функция принимает обычный пароль и его хэш, возвращая True, если пароль соответствует хэшу, и False в противном случае.

Думаю что в этой части все понятно. Теперь напишем функцию для генерации JWT токена.

from jose import jwt
from datetime import datetime, timedelta, timezone
from app.config import get_auth_data


def create_access_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(days=30)
    to_encode.update({"exp": expire})
    auth_data = get_auth_data()
    encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])
    return encode_jwt

Давайте разберемся с get_auth_data. Тут мы просто должны добавить некоторые параметры в наш .env файл и после поместить эти данные в настройки.

.env

DB_HOST=localhost
DB_PORT=5433
DB_NAME=fast_api
DB_USER=amin
DB_PASSWORD=my_super_password
SECRET_KEY=gV64m9aIzFG4qpgVphvQbPQrtAO0nM-7YwwOvu0XPt5KJOjAy4AfgLkqJXYEt
ALGORITHM=HS256

 Обратите внимание, что у нас появился SECRET и ALGORITHM.

 После правок файл с настройками будет выглядеть так:

import os
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    DB_HOST: str
    DB_PORT: int
    DB_NAME: str
    DB_USER: str
    DB_PASSWORD: str
    SECRET_KEY: str
    ALGORITHM: str

    model_config = SettingsConfigDict(
        env_file=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".env")
    )


settings = Settings()


def get_db_url():
    return (
        f"postgresql+asyncpg://{settings.DB_USER}:{settings.DB_PASSWORD}@"
        f"{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
    )

    
def get_auth_data():
    return {"secret_key": settings.SECRET_KEY, "algorithm": settings.ALGORITHM}

Тут мы добавили метод, который будет возвращать эти два параметра. О том как оно все тут работает — я писал в прошлых статьях подробно. Теперь разберем сам код:

def create_access_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(days=30)
    to_encode.update({"exp": expire})
    auth_data = get_auth_data()
    encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])
    return encode_jwt

Функция create_access_token создает JSON Web Token (JWT) для аутентификации пользователей. Она принимает словарь с данными, добавляет время истечения токена (по умолчанию 30 дней), и затем кодирует эти данные в JWT с использованием секретного ключа и алгоритма шифрования, заданных в конфигурации приложения.

Далее, мы сможем поместить данный токен в куки, после чего начать его считывать. Поместить в него можно и другие данные. 

В результате считывания мы сможем понимать следующее:

  1. Истек ли токен. Если да, то мы можем автоматически «выбить» пользователя из системы

  2. Принимаем ID пользователя или прочие данные о нем для дальнейшей работы. Например, для предоставления пользователю того или иного функционала (данных).

В серьезных приложениях для того чтоб пользователя не выкидывало из системы реализуется функционал refresh_token. Если коротко, то refresh_token позволяет автоматически обновить срок жизни токена, тем самым, не заставляя пользователя повторно входить в систему.

Я понимаю, что сейчас не все ясно, но, совсем скоро, все станет на свои места.

Таблица пользователей.

Данные о пользователях, как и их права, нам нужно где-то хранить. Так давайте создадим соответствующую модель в файле users/models.py:

from sqlalchemy import text
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base, str_uniq, int_pk


class User(Base):
    id: Mapped[int_pk]
    phone_number: Mapped[str_uniq]
    first_name: Mapped[str]
    last_name: Mapped[str]
    email: Mapped[str_uniq]
    password: Mapped[str]

    is_user: Mapped[bool] = mapped_column(default=True, server_default=text('true'), nullable=False)
    is_student: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)
    is_teacher: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)
    is_admin: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)
    is_super_admin: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)

    extend_existing = True

    def __repr__(self):
        return f"{self.__class__.__name__}(id={self.id})"

В статье «Создание собственного API на Python (FastAPI): Router и асинхронные запросы в PostgreSQL (SQLAlchemy)» я подробно рассматривал тему создания таблиц (моделей) и работу с ними.

Тут мы описали нашу будущую таблицу. Из «необычного» тут есть следующие поля: is_user, is_student, is_teacher, is_admin, is_super_admin. Как вы понимаете, это ни что иное как роли пользователей.

По умолчанию мы присвоим каждому авторизованному пользователю роль is_user, но, в последствии, вы сможете изменить роль пользователю, тем самым, открывая ему доступ к новому функционалу или к особым данным.

Не забываем привязать новую таблицу в migration/env.py

from app.users.models import User

Выполняем миграцию (ревизию)

alembic revision --autogenerate -m "create users table"

Выполняем upgrade

alembic upgrade head

В результате у вас должна появиться таблица такого вида

Отлично! Теперь можем приступить к описанию самого роутера.

Нам необходим создать Router и напишем метод для регистрации пользователя.

from fastapi import APIRouter, HTTPException, status
from app.users.auth import get_password_hash
from app.users.dao import UsersDAO
from app.users.schemas import SUserRegister


router = APIRouter(prefix='/auth', tags=['Auth'])


@router.post("/register/")
async def register_user(user_data: SUserRegister) -> dict:
    user = await UsersDAO.find_one_or_none(email=user_data.email)
    if user:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail='Пользователь уже существует'
        )
    user_dict = user_data.dict()
    user_dict['password'] = get_password_hash(user_data.password)
    await UsersDAO.add(**user_dict)
    return {'message': 'Вы успешно зарегистрированы!'}

Давайте разбираться.

Сразу мы видим, что я импортировал схему SuserRegister и новый класс UsersDAO. Если вы читали прошлые статьи по теме FastApi, то знаете, что SuserRegister, это, скорее всего Pydantic модель, описывающая тело post запроса, а UsersDAO — это класс для взаимодействия с таблицей пользователей.

Схема  SuserRegister:

from pydantic import BaseModel, EmailStr, Field, validator
import re


class SUserRegister(BaseModel):
    email: EmailStr = Field(..., description="Электронная почта")
    password: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков")
    phone_number: str = Field(..., description="Номер телефона в международном формате, начинающийся с '+'")
    first_name: str = Field(..., min_length=3, max_length=50, description="Имя, от 3 до 50 символов")
    last_name: str = Field(..., min_length=3, max_length=50, description="Фамилия, от 3 до 50 символов")

    @validator("phone_number")
    @classmethod
    def validate_phone_number(cls, value: str) -> str:
        if not re.match(r'^\+\d{5,15}$', value):
            raise ValueError('Номер телефона должен начинаться с "+" и содержать от 5 до 15 цифр')
        return value

Подобное мы уже писали. В данной теме мы сообщаем, что ждем мы email, password, phone_number, first_name и last_name. Каждое из полей обязательно для заполнения.

Файл users/dao.py

from app.dao.base import BaseDAO
from app.users.models import User

 
class UsersDAO(BaseDAO):
    model = User

Я решил не усложнять и просто наследовался от базового класса BaseDAO. То есть использовать мы будем только универсальные методы. Подробно об этом писал в прошлой статье.

Вернемся к обработчику POST запроса:

@router.post("/register/")
async def register_user(user_data: SUserRegister) -> dict:
    user = await UsersDAO.find_one_or_none(email=user_data.email)
    if user:
        raise HTTPException(status_code=status.HTTP_409_CONFLICT,
                            detail='Пользователь уже существует')
    user_dict = user_data.dict()
    user_dict['password'] = get_password_hash(user_data.password)
    await UsersDAO.add(**user_dict)
    return {'message': f'Вы успешно зарегистрированы!'}

Тут мы принимаем данные от пользователя после регистрации, затем делаем проверку на то существует ли он в базе данных. Если не существует и никаких ошибок валидации нет, то мы записываем пользователя в базу дынных.

По ошибкам (исключениям) я подготовлю в своем телеграмм канале небольшой эксклюзивный гайд. Покажу как вынести исключения в отдельный файл, какие исключения бывают и как с ними проще работать. Так же я расскажу и о том, зачим импортировать status из FastApi.

Предварительно, перед записью, мы трансформируем пароль в hash-строку. Вроде бы не сильно сложно, правда?

Регистрируем роутер в файле main.py:

from fastapi import FastAPI
from app.students.router import router as router_students
from app.majors.router import router as router_majors
from app.users.router import router as router_users


app = FastAPI()


@app.get("/")
def home_page():
    return {"message": "Привет, Хабр!"}

  
app.include_router(router_users)
app.include_router(router_students)
app.include_router(router_majors)

Давайте тестировать. Запускаем из корня проекта:

uvicorn app.main:app

 Попробуем передать данные о пользователе.

{
  "email": "user@example.com",
  "password": "super_password",
  "phone_number": "+874493831",
  "first_name": "Алексей",
  "last_name": "Яковенко"
}

Видим, что мы зарегистрированы. Проверим что там у нас в базе данных.

Видим что данные записаны, а пароль трансформирован в hash-строку. Неплохое начало!

Логично что следующий шаг это аутинтификация пользователя в базе данных.

И тут нам необходимо будет провернуть один трюк. Наша задача в том, чтоб после успешного входа в систему (аутинтефикация) мы сгенирировали JWT токен и поместили его в куки.

Для удобства давайте в файле auth.py напишем функцию, которая будет принимать Email и пароль, а дальше просто будет проверять есть ли такой пользователь и с таким паролем в базе данных. Без всяких там токенов и куки.

async def authenticate_user(email: EmailStr, password: str):
    user = await UsersDAO.find_one_or_none(email=email)
    if not user or verify_password(plain_password=password, hashed_password=user.password) is False:
        return None
    return user

Тут мы попытались получить данные о пользователе по email. Затем, если пользователь с таким email получен, мы проверяем соответствует ли тот пароль что передал пользователь — hash-строке.

Если все условия выполнены — вернем данные о пользователе. Иначе вернем None.

Кроме того, давайте напишем Pydantic модель для авторизации пользователя:

class SUserAuth(BaseModel):
    email: EmailStr = Field(..., description="Электронная почта")
    password: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков")

Тем самым мы укажем, что в теле POST запроса мы ожидаем email и password.

Теперь можем написать и сам обработчик POST запроса:

@router.post("/login/")
async def auth_user(response: Response, user_data: SUserAuth):
    check = await authenticate_user(email=user_data.email, password=user_data.password)
    if check is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                            detail='Неверная почта или пароль')
    access_token = create_access_token({"sub": str(check.id)})
    response.set_cookie(key="users_access_token", value=access_token, httponly=True)
    return {'access_token': access_token, 'refresh_token': None}

Мы использовали тут новый параметр response.

response здесь представляет объект Response, который используется для управления HTTP-ответом, отправляемым клиенту. Он позволяет установить заголовки ответа, установить куки и так далее.

Далее мы просто выполняем проверку: получили мы пользователя или нет.

Если не получили, то вернем ошибку:

HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Неверная почта или пароль')

Тут я специально явно не указывал ошибка в почте или в пароле для безопасности и защиты от подборов. Злоумышленник не будет знать существует ли пользователь с указанным логином или нет.

Если данные о пользователе получены, то мы генерируем JWT токен, а затем записываем его в куку.

response.set_cookie(key="users_access_token", value=access_token, httponly=True)

Флаг httponly=True, установленный при установке куки с помощью метода response.set_cookie, указывает браузеру, что куки должны быть доступны только через HTTP или HTTPS, и не могут быть доступны скриптам JavaScript на стороне клиента.

Это повышает безопасность приложения, так как куки, содержащие чувствительные данные, такие как токены аутентификации (access_token), не могут быть скомпрометированы через атаки XSS (межсайтовый скриптинг).

Таким образом, флаг httponly=True помогает защитить данные пользователя от несанкционированного доступа и использования.

В целом все просто и сейчас, после авторизации, я вам покажу как будет выгядеть запись в куке.

После мы возвращаем {'access_token': access_token, 'refresh_token': None}.

Тестируем.

Намеренно допустим ошибку

Теперь укажем данные корректно:

Видим что получили ответ. Что там по кукам?

Для входа жмем F12. Переходим на вкладку "Aplications" (приложение) и смотрим на куки
Для входа жмем F12. Переходим на вкладку "Aplications" (приложение) и смотрим на куки

Мы видим, что в текущих куках появился ключ users_access_token с созданным токеном в значение. Отлично, ведь это то что нам было нужно!

Далее нам необходимо написать такой механизм, который позволит нам смотреть в куки и проверять, есть ли вообще ключ users_access_token, валиден ли этот ключ и так далее и тут начинается более сложная тема.

Dependencies на практике

В начале статьи я уже говорил о том что такое dependencies (зависимости), сейчас же мы познакомимся с ними на практике, но, перед этим, давайте напишем несколько сопутствующих функций.

Напишем функцию, которая позволит достать токен:

from fastapi import Request, HTTPException, status, Depends


def get_token(request: Request):
    token = request.cookies.get('users_access_token')
    if not token:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Token not found')
    return token

Смысл данного кода в том, чтоб достать значение ключа users_access_token из куки. Дополнительных проверок пока нет. Мы или получим некоторое значение или вызовем исключение, мол токен не найден.

А теперь давайте напишем функцию, которая будет зависеть от функции  get_token. Смысл зависимости тут будет сводится к тому, чтоб или выполнить функцию, если мы получим токен или сразу вызвать исключение.

Отправляю полный код:

from fastapi import Request, HTTPException, status, Depends
from jose import jwt, JWTError
from datetime import datetime, timezone
from app.config import get_auth_data
from app.exceptions import TokenExpiredException, NoJwtException, NoUserIdException, ForbiddenException
from app.users.dao import UsersDAO


def get_token(request: Request):
    token = request.cookies.get('users_access_token')
    if not token:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Token not found')
    return token

  
async def get_current_user(token: str = Depends(get_token)):
    try:
        auth_data = get_auth_data()
        payload = jwt.decode(token, auth_data['secret_key'], algorithms=[auth_data['algorithm']])
    except JWTError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен не валидный!')

    expire = payload.get('exp')
    expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)
    if (not expire) or (expire_time < datetime.now(timezone.utc)):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')

    user_id = payload.get('sub')
    if not user_id:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Не найден ID пользователя')

    user = await UsersDAO.find_one_or_none_by_id(int(user_id))
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found')

    return user

Декодер:

try:
    auth_data = get_auth_data()
    payload = jwt.decode(token, auth_data['secret_key'], algorithms=[auth_data['algorithm']])
except JWTError:
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен не валидный!')

В этом месте мы создали свой декодер. Его смысл в том, чтоб получить из токена данные с которыми можно будет работать. В нашем случае это exp и sub.

Теперь проверим срок токена (истек или не истек)

expire = payload.get('exp')
expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)
if (not expire) or (expire_time < datetime.now(timezone.utc)):
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')

Тут нам необходимо привести данные по дате завершения срока жизни токена в питонвский формат.

Забираем значение:

expire: str = payload.get('exp')

Трансформируем в нужный формат:

expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)

Пишем простое условие:

if (not expire) or (expire_time < datetime.now(timezone.utc)):
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')

Тут смысл в том, чтоб проверить есть ли параметр срока истечения жизни токена и не является ли эта дата больше текущей. Если одно из двух условий верное, то вызывается исключение.

Далее мы проверяем есть ли параметр ID пользователя:

user_id: str = payload.get('sub')
if not user_id:
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Не найден ID пользователя')

Далее если параметр есть, то мы стараемся получить данные о пользователе:

user = await UsersDAO.find_one_or_none_by_id(int(user_id))
if not user:
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found')
return user

Важно не забыть трансформировать  user_id в integer.

Вот и все сложности. Завершив проверку мы или исключение (ошибку) впоймаем или получим данные о пользователе.

Как вы понимаете, все сводится к тому, чтоб выйти из функции без исключения. Если это произошло — значит все у нас получилось.

Теперь напишем обработчик GET запроса, который будет возвращать данные о пользователе авторизованному пользователю и возвращать исключение, если пользователь не авторизован.

@router.get("/me/")
async def get_me(user_data: User = Depends(get_current_user)):
    return user_data

Обратите внимание, что тут мы указали зависимость от функции  get_current_user. В свою очередь  get_current_user зависит от get_token. На этом простом примере вы видите как можно выстраивать многоуровневые зависимости.

И, прежде чем мы приступим к тестам, давайте напишем простой обработчик, который будет выбивать пользователя из системы. Как вы понимаете, для этого нам достаточно будет удалить JWT токен из куки.

@router.post("/logout/")
async def logout_user(response: Response):
    response.delete_cookie(key="users_access_token")
    return {'message': 'Пользователь успешно вышел из системы'}

Тестируем.

Выходим из системы

Пробуем получить о себе данные:

Получаем ожидаемую ошибку. Выполним повторную аутинтификацию.

Повторно пробуем получить о себе данные.

Видим что данные успешно получены.

А теперь давайте напишем метод, который будет доступен только для пользователя с ролью администратора (is_admin = True). Для этого в файле dependensies.py напишем новую функцию.

async def get_current_admin_user(current_user: User = Depends(get_current_user)):
    if current_user.is_admin:
        return current_user
    raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Недостаточно прав!')

И теперь, если вы внимательно читали, то вы понимаете общую логику работы данного метода.

Напоминаю, что функция get_current_user, при успешном выполнении, возвращает данные о пользователе. В функции же get_current_admin_user мы проверяем наличие флага True в значении is_admin и только в том случае, если is_admin = True мы даем необходимые права пользователю на доступ к функционалу.

Напишем обработчик GET запроса, который вернет список из всех пользователей только администратору:

@router.get("/all_users/")
async def get_all_users(user_data: User = Depends(get_current_admin_user)):
    return await UsersDAO.find_all()

Так как у меня сейчас уровень доступа is_user – я получаю следующую ошибку:

Сейчас я «руками» изменю уровень доступа, а вам советую написать обработчик POST запроса, который позволит выполнить обновление роли и доступ дайте только администратору.

Повторно выполним запрос:

Отлично, данные получены!

Проблемы и новые вызовы

Начнем с того, что не совсем правильно руками заходить в базу данных и ставить True или False под той или иной ролью.

Дальше, роли могут появляться или исчезать.

Для решения данной проблемы я специально не писал код, а хочу порекомендовать вам сделать это самостоятельно. Задача будет выглядеть так:

  1. Создать эндпоинт для добавления / удаления ролей и сделать его доступным только админу. На входе метод пусть принимает название роли. Далее опишите DELETE и POST эндпоинты под конкретные задачи

  2. Создать эндпоинт для изменения роли пользователя. На входе он должен принимать роль и ее новое значение. Для тех кто прям хорошо разобрался в теме, предлагаю написать специальный метод в классе UserDao который будет скидывать все флажки на False и будет устанавливать флаг True той роли, которую вы назначили пользователю. Естественно, доступ к функционалу должен быть только у администратора

В телеграмм у меня, так же, есть собществ «Легкий путь в Python — сообщество» там мы обсуждаем, в том числе, проблемы с кодингом и вместе решаем их. Присоеденяйтесь!

И есть у нас одна новая, глобальная проблема — неудобно пользоваться обычным юзерам. Сейчас поясню.

Мы с вами программисты и работа через чистое API наша стихия, но вашим продуктом должны пользоваться обычные пользователи, бизнес, а для того чтоб это было комфортно — нужен приличный фронт (визуализация апи).

Нужны формы, анимации, кнопки и прочее и, как вы уже догадались, прикручивать фронт мы будем уже в следующей статье.

Заключение

Сегодня мы рассмотрели достаточно сложную тему и, несмотря, на то что я пытался описать тут все максимально подробно — без конкретной практики усвоить данный материал будет невозможно или очень сложно. Практикуйтесь!

Далее, хочу напомнить, что полный исходник данного кода вы найдете в моем телеграмм канале «Легкий путь в Python». Там же, я выложу отдельный гайд по работе с исключениями. Я покажу в нем, как их сохранять в переменные и переиспользовать в рамках одного или нескольких проектов.

На этом пока все. До скорых встреч!

Комментарии (1)


  1. NewSouth
    18.07.2024 11:23
    +3

    if (not expire) or (expire_time < datetime.now(timezone.utc)): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')

    Проверка лишняя. jwt.decode уже проверяет срок жизни токена. Первый except JWTErrorэто отловит.

    from time import sleep
    from datetime import datetime, timezone, timedelta
    
    from jose import jwt
    
    
    def create_access_token(data: dict) -> str:
        to_encode = data.copy()
        expire = datetime.now(timezone.utc) + timedelta(seconds=3)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, "mysecret")
    
    
    if __name__ == "__main__":
        token = create_access_token({"some": "data"})
        print(token)
        sleep(5)
        print(jwt.decode(token, algorithms=["HS256"], key="mysecret"))
    
    >>eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoiZGF0YSIsImV4cCI6MTcyMTMwMTQ5MH0.KjLGf3chNy5pvx-vsYY_nKbSMNML0-LTR6PfD8dAXJA
    >>Traceback (most recent call last):
    >>...
    >>jose.exceptions.ExpiredSignatureError: Signature has expired.