Начало
Привет, Хабр! Я учусь в Российско-Таджикском Славянском университете (на первом курсе), собственно у нас в университете действует так называемая кредитно-бальная система.
Для просмотра количества набранных баллов и так далее, у нас есть приложение которое было разработано университетом.
Оно доступно для Android.
Однако, я недавно перешёл на iOS-систему, собственно к моему удивлению приложения там не оказалось.
Ну и тут, я подумал что надо бы разработать что-то типа Telegram-бота для просмотра успеваемости, в конце концов это многим должно помочь.
Да и вообще, Telegram работает везде, это моя любимая платформа для обмена сообщениями.
Поиск endpoint'ов...
Разумеется, университет сделал некий API для своего Android-фронтенда, для получения эндпоинтов не я, мой друг, декомпилировал APK файл и предоставил его мне, позже проанализировав "выхлоп" я нашел четыре необходимых мне эндпоинта
В частности эндпоинты для
Авторизации
Получения информации о профиле студента
Получения семестров
Получения данных об успеваемости по всем дисциплинам конкретного семестра.
Ну а дальше, дело за малым, надо просто написать клиент для этого API и так далее.
Инициализация проекта с помощью Poetry, написание обёртки под API
Создаём проект
poetry init
Я сразу создал почти на самом верхнем уровне проекта пакетник rtsu
где и будет лежать наша обёртка.
Давайте посмотрим на api.py
from aiohttp import ClientSession, ContentTypeError, client_exceptions
from cashews import cache
from typing import Optional, Union, Dict, TypeVar, Type, List, Self
from pydantic import BaseModel, parse_obj_as
from .exceptions import NotAuthorizedError, RtsuContentTypeError, ServerError, AuthError
from .schemas import AuthSchema, Profile, Subject, AcademicYear
RTSU_API_BASE_URL = "https://mobile.rtsu.tj/api/v1"
P = TypeVar("P", bound=BaseModel)
class RTSUApi:
"""
This class provides for you functionality of RTSU public API
"""
def __init__(self, token: Optional[str] = None):
"""
Initializes `self`
:param token: A rtsu-api token (optional)
"""
self._api_token = token
self._http_client = ClientSession()
def set_token(self, token: str):
"""
Setups token
:param token: A token
:return:
"""
self._api_token = token
async def _make_request(
self,
method: str,
url_part: str,
response_model: Type[Union[List[BaseModel], BaseModel]],
json: Optional[Dict[str, Union[str, int]]] = None,
params: Optional[Dict[str, str]] = None,
auth_required: bool = False,
) -> Union[P, List[P]]:
"""
Makes call to RTSU API
:param url_part: Part of RTSU-API url, example - /auth
:param json: A json for sending
:param params: URI parameters for sending
:return: Response object
"""
if not json:
json = {}
if not params:
params = {}
headers = {}
if auth_required:
if not self._api_token:
raise NotAuthorizedError("Not authorized, use `.auth` method.")
headers['token'] = self._api_token
try:
response = await self._http_client.request(
method,
f"{RTSU_API_BASE_URL}/{url_part}",
json=json,
params=params,
headers=headers,
ssl=False,
)
except (client_exceptions.ClientConnectionError, client_exceptions.ClientConnectorError) as e:
raise ServerError(f"Connection error, details: {e}")
if response.status != 200:
details = await response.text()
raise ServerError(
f"Server returned {response.status}, details: {details}"
)
try:
deserialized_data = await response.json()
except ContentTypeError as e:
raise RtsuContentTypeError(
e.message,
)
return parse_obj_as(response_model, deserialized_data)
async def auth(self, login: str, password: str) -> AuthSchema:
"""
Authenticates user
:param login: A login of user
:param password: A password of user
:return: RTSU token on success
"""
try:
response: AuthSchema = await self._make_request(
"POST",
"auth",
AuthSchema,
params={
"login": login,
"password": password,
}
)
except ServerError as e:
raise AuthError(
f"Auth error, check login and password, message from server: {e.message}"
)
self._api_token = response.token
return response
@cache.soft(ttl="24h", soft_ttl="1m")
async def get_profile(self) -> Profile:
"""
Returns profile of RTSU student
:return: `Profile`-response
"""
return await self._make_request(
"GET",
"student/profile",
Profile,
auth_required=True,
)
async def get_academic_years(self) -> List[AcademicYear]:
"""
Returns `List` with `AcademicYear` objects
:return:
"""
return await self._make_request(
"GET",
"student/academic_years",
List[AcademicYear],
auth_required=True,
)
@cache.soft(ttl="24h", soft_ttl="1m")
async def get_academic_year_subjects(self, year_id: int) -> List[Subject]:
"""
Returns `List` with `Subjects` of some year
:return:
"""
return await self._make_request(
"GET",
f"student/grades/{year_id}",
List[Subject],
auth_required=True,
)
async def get_current_year_id(self) -> int:
"""
Returns identifier of current year
:return:
"""
years = await self.get_academic_years()
return years[0].id
async def __aenter__(self) -> Self:
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close_session()
def __str__(self) -> str:
"""
Stringifies `RTSUApi` objects
:return:
"""
return f"{self.__class__.__name__}<token={self._api_token}>"
async def close_session(self):
"""Frees inner resources"""
await self._http_client.close()
Что тут у нас? В _make_request
у нас осуществляется запрос к серверу а также десериализация json в pydantic-схему (ну или модель?)
Прошу заметить, что я использую замечательную cashews для кеширования результатов, в частности soft-ttl который еще и сильно помогает когда сервера университета падают.
В остальных же методах я просто указываю эндпоинт и response-schema
ну и дёргаю тот же _make_request
Также, тут есть методы для того чтобы закрыть текущую aiohttp-сессию, ну тут понятно, помимо этого реализованы магические методы __aenter__
и __aexit__
для того чтобы использовать клиент в with
конструкциях.
Ну и set_token
метод которое просто устанавливает значения токена для того чтобы вручную "впихнуть" токен в клиент, это пригодиться нам чуть позже.
Pydantic-схемы
Допустим, заглянем в profile.py
где лежит Profile-schema
from pydantic import Field
from .base import Base
class FullName(Base):
ru: str = Field(alias='RU')
tj: str = Field(alias='TJ')
class Faculty(FullName):
...
class Speciality(FullName):
...
class Profile(Base):
id: int = Field(alias='RecordBookNumber')
full_name: FullName = Field(alias='FullName')
faculty: Faculty = Field(alias="Faculty")
course: int = Field(alias='Course')
training_period: int = Field(alias='TrainingPeriod')
level: str = Field(alias="TrainingLevel")
entrance_year: str = Field(alias='YearUniversityEntrance')
Почему так? API возвращает мне информацию сразу на двух языках (русском и таджикском)
Собственно, это действительно свидетельствует о том, что API сделали очень криво и убого, но что поделать, тут я просто наследую каждый field от FullName чтобы не писать по два раза RU, TJ и так далее.
Также, прошу заметить то, как элегантно можно сделать field'ы pythonic при помощи pydantic-aliases
После написания всего этого, я принялся тестировать клиент, на удивление он хорошо работает и предоставляет данные в том формате, в котором они мне нужны.
Собственно тесты к этому я тоже написал
import pytest
import pytest_asyncio
from rtsu_students_bot.rtsu import RTSUApi
from .config import settings
pytest_plugins = ('pytest_asyncio',)
@pytest_asyncio.fixture()
async def rtsu_client():
"""
Initializes client
:return: Prepared `RTSUApi` client
"""
async with RTSUApi() as api:
yield api
@pytest.mark.asyncio
async def test_rtsu_login(rtsu_client: RTSUApi):
"""
Tests rtsu login
:param rtsu_client: A RTSU API client
:return:
"""
resp = await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)
assert resp.token is not None
@pytest.mark.asyncio
async def test_rtsu_profile_fetching(rtsu_client: RTSUApi):
"""
Tests rtsu profile fetching
:param rtsu_client:
:return:
"""
await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)
profile = await rtsu_client.get_profile()
assert profile is not None
assert profile.full_name is not None
@pytest.mark.asyncio
async def test_rtsu_academic_years_fetching(rtsu_client: RTSUApi):
"""
Tests rtsu academic years fetching
:param rtsu_client:
:return:
"""
await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)
years = await rtsu_client.get_academic_years()
assert type(years) == list
assert len(years) > 0
@pytest.mark.asyncio
async def test_rtsu_academic_year_subjects_fetching(rtsu_client: RTSUApi):
"""
Tests rtsu academic year fetching
:param rtsu_client:
:return:
"""
await rtsu_client.auth(settings.rtsu_api_login, settings.rtsu_api_password)
ac_years = await rtsu_client.get_academic_years()
year = ac_years[0].id
years = await rtsu_client.get_academic_year_subjects(year)
assert type(years) == list
assert len(years) > 0
Тут тесты сделаны наверное грязно и тупо, я пока не читал про best-practises в тестировании API, буду рад предложениям в комментах по этому поводу.
Ах, да, для тестов тут используется отдельный конфиг, в конце статьи я покажу как его заполнить.
На руках у нас уже есть wrapper
База данных и SQLAlchemy
Установив алхимию, я принялся создавать пакет моделей
Но для начала, покажу вам файлик database.py в котором настроено подключение к базе.
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from rtsu_students_bot.config import settings
engine = create_async_engine(
settings.db.url,
)
SessionLocal = sessionmaker(bind=engine, class_=AsyncSession)
Тут нет ничего такого, просто вместо обычной сессии я использую асинхронную (по очевидным причинам)
Вернёмся к пакетнику с моделями.
from sqlalchemy import Integer, Column, String, Boolean, BigInteger
from .base import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
full_name = Column(String(length=255), nullable=True)
token = Column(String(length=600), nullable=True)
is_authorized = Column(Boolean, default=False)
telegram_id = Column(BigInteger)
def __str__(self):
return f"{self.__class__.__name__}<id={self.id}, name={self.full_name}>"
Тут всё очень просто, у нас в базе будет храниться токен, telegram-id, ну и флажок который будет сообщать о том, авторизован ли пользователь.
Далее, пилим пакетник service
который будет помогать нам работать с базой данных
Содержимое user.py
from typing import Optional
from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.models import User
from .exceptions import UserNotFound, UserAlreadyExists
async def get_user_by_tg_id(
session: AsyncSession,
telegram_id: int,
) -> Optional[User]:
"""
Returns user by tg-id
:param session: An `AsyncSession` object
:param telegram_id: A telegram-ID
:return: `User` or `None`
"""
stmt = select(User).where(User.telegram_id == telegram_id)
result = await session.execute(stmt)
return result.scalars().first()
async def get_user_by_id(
session: AsyncSession,
user_id: int,
) -> Optional[User]:
"""
Returns user by its id
:param session: An `AsyncSession` object
:param user_id: An ID
:return: `User` or `None`
"""
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalars().first()
async def create_user(
session: AsyncSession,
telegram_id: int,
full_name: Optional[str] = None,
token: Optional[str] = None,
):
"""
Creates `User` object
:param session: An `AsyncSession` object
:param telegram_id: A telegram-id
:param full_name: Fullname of user
:param token: A token of user
:return: Created `User`
"""
existed_user = await get_user_by_tg_id(session, telegram_id)
if existed_user is not None:
raise UserAlreadyExists(f"User with ID {telegram_id} already exists.")
is_authorized = token is not None
obj = User(
telegram_id=telegram_id,
full_name=full_name,
token=token,
is_authorized=is_authorized,
)
session.add(obj)
await session.flush()
await session.refresh(obj)
return obj
async def update_user_token(
session: AsyncSession,
telegram_id: int,
token: Optional[str] = None,
) -> User:
"""
Authorizes `User`
:param telegram_id:
:param session:
:param token:
:return:
"""
user = await get_user_by_tg_id(session, telegram_id)
if not user:
raise UserNotFound(f"User with telegram-id {telegram_id} not found.")
is_authorized = token is not None
stmt = update(User).where(
int(user.id) == User.id
).values(
is_authorized=is_authorized,
token=token,
)
await session.execute(stmt)
return await get_user_by_tg_id(session, user.telegram_id)
async def update_user(
session: AsyncSession,
user_id: int,
telegram_id: Optional[int] = None,
full_name: Optional[str] = None,
) -> User:
"""
Updates telegram user
:param session:
:param user_id:
:param telegram_id:
:param full_name:
:return:
"""
user = await get_user_by_id(session, user_id)
if user is None:
raise UserNotFound(f"User with ID {user_id} not found.")
stmt = update(User).where(User.id == user_id)
if telegram_id is not None:
stmt = stmt.values(
telegram_id=telegram_id,
)
if full_name is not None:
stmt = stmt.values(
full_name=full_name
)
await session.execute(stmt)
return await get_user_by_id(session, user_id)
async def delete_user(session: AsyncSession, user_id: int):
"""
Deletes `User` object
:param user_id:
:param session: An `AsyncSession` object
:return:
"""
if await get_user_by_id(session, user_id) is None:
raise ValueError("Invalid user-id passed.")
stmt = delete(User).where(User.id == user_id)
await session.execute(stmt)
Здесь я реализовал обычные функции которые будут помогать мне работать с базой, хотя наверное лучше бы я применил паттерн "Репозиторий"
Тестируем созданный CRUD
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from rtsu_students_bot.service import user
from rtsu_students_bot.models import Base
from .config import settings
pytest_plugins = ('pytest_asyncio',)
engine = create_async_engine(
settings.db_url,
)
SessionLocal = sessionmaker(autoflush=True, bind=engine, class_=AsyncSession)
@pytest_asyncio.fixture()
async def session():
"""
Initializes client
:return: Prepared `RTSUApi` client
"""
async with SessionLocal() as e, e.begin():
yield e
@pytest.mark.asyncio
async def test_tables_creating():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@pytest.mark.asyncio
async def test_user_creation(session: AsyncSession):
"""
Tests user-creation
:return:
"""
user_data = {
"full_name": "Vladimir Putin",
"telegram_id": 1,
}
created_user = await user.create_user(session, **user_data)
assert created_user.full_name == user_data.get("full_name")
assert created_user.telegram_id == user_data.get("telegram_id")
@pytest.mark.asyncio
async def test_user_update(session: AsyncSession):
"""
Tests user updating
:param session:
:return:
"""
updating_data = {
"full_name": "Volodymir Zelensky"
}
first_user = await user.get_user_by_tg_id(session, 1)
updated_user = await user.update_user(session, first_user.id, **updating_data)
assert first_user.id == updated_user.id
assert first_user.telegram_id == updated_user.telegram_id
assert updated_user.full_name == updating_data.get("full_name")
@pytest.mark.asyncio
async def test_user_token_updating(session: AsyncSession):
"""
Tests user-token updating
:param session:
:return:
"""
first_user = await user.get_user_by_tg_id(session, 1)
assert not first_user.is_authorized
first_user = await user.update_user_token(session, first_user.telegram_id, token="test token")
assert first_user.is_authorized
assert first_user.token == "test token"
assert first_user.telegram_id == 1
@pytest.mark.asyncio
async def test_user_deleting(session: AsyncSession):
"""
Tests user-token updating
:param session:
:return:
"""
first_user = await user.get_user_by_tg_id(session, 1)
assert first_user is not None
await user.delete_user(session, first_user.id)
first_user = await user.get_user_by_tg_id(session, 1)
assert first_user is None
Тут также создается подключение к базе данных, я просто проверяю CRUD на работоспособность, собственно данные тесты помогли мне найти одну ошибку, поэтому я думаю писал я их не зря =)
Пилим самого бота
Для работы с Telegram Bot API я несомненно выбрал Aiogram ну и для этого сделал еще один пакетник.
Middlewares
Для того чтобы "протаскивать" необходимые ресурсы к хендлерам (API-клиент и AsyncSession), мне нужен специальный для этих целей мидлварь
import logging
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram import types
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.rtsu import RTSUApi
from rtsu_students_bot.database import engine
class ResourcesMiddleware(BaseMiddleware):
"""
Middleware for providing resources like db-connection and RTSU-client
"""
def __init__(self):
"""
Initializes self
"""
self._logger = logging.getLogger("resources_middleware")
super().__init__()
@staticmethod
async def _provide_api_client() -> RTSUApi:
"""
Provides `RTSU` api client
:return: Initialized client
"""
client = RTSUApi()
return client
@staticmethod
async def _provide_db_session() -> AsyncSession:
"""
Provides `AsyncSession` object
:return: Initialized session
"""
session = AsyncSession(engine)
return session
async def _provide_resources(self) -> dict:
"""
Initializes & provides needed resources, such as `RTSU-api-client` and `AsyncSession`
:return:
"""
self._logger.debug("Providing resources")
api_client = await self._provide_api_client()
db_session = await self._provide_db_session()
resources = {
"rtsu": api_client,
"db_session": db_session,
}
return resources
async def _cleanup(self, data: dict):
"""
Closes connections & etc.
:param data:
:return:
"""
self._logger.debug("Cleaning resources")
if "db_session" in data:
self._logger.debug("SQLAlchemy session detected, closing connection.")
session: AsyncSession = data["db_session"]
await session.commit() # Commit changes
await session.close()
if "rtsu" in data:
self._logger.debug("RTSU API Client detected, closing resource.")
api_client: RTSUApi = data["rtsu"]
await api_client.close_session()
async def on_pre_process_message(self, update: types.Message, data: dict):
"""
For pre-processing `types.Update`
:param data: Data from other middlewares
:param update: A telegram-update
:return:
"""
resources = await self._provide_resources()
data.update(resources)
return data
async def on_pre_process_callback_query(self, query: types.CallbackQuery, data: dict):
"""
Method for preprocessing callback-queries
:param query: A callback-query
:param data: A data from another middleware
:return:
"""
resources = await self._provide_resources()
data.update(resources)
return data
async def on_post_process_callback_query(self, query: types.CallbackQuery, data_from_handler: list, data: dict):
"""
Method for post-processing callback query
:param data_from_handler: Data from handler
:param query: A callback query
:param data: A data from another middleware
:return:
"""
await self._cleanup(data)
async def on_post_process_message(self, message: types.Message, data_from_handler: list, data: dict):
"""
For post-processing message
:param data_from_handler:
:param message:
:param data:
:return:
"""
await self._cleanup(data)
Тут все очень просто, при старте обработки сообщений/CallbackQuery я просто подгружаю ресурсы и передаю их в data
Также, после обработки всего этого дела, я вызываю _cleanup
который при обнаружении сессий закроет их (ну и сделает коммит в случае с алхимией)
Получение пользователя
Для этого я сделал тоже отдельный мидлварь, который разумеется отрабатывает после первого
import logging
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram import types
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.service import user
from rtsu_students_bot.rtsu import RTSUApi
class UserMiddleware(BaseMiddleware):
"""
Middleware for providing a `User` object
"""
def __init__(self):
"""
Initializes self
"""
self._logger = logging.getLogger("users_middleware")
super().__init__()
async def _provide_user(self, user_id: int, data: dict) -> dict:
"""
Fetches and returns user
"""
if 'db_session' not in data:
raise RuntimeError("AsyncSession not found.")
if 'rtsu' not in data:
raise RuntimeError("RTSU API client not found.")
db_session: AsyncSession = data.get("db_session")
rtsu_client: RTSUApi = data.get("rtsu")
self._logger.debug(f"Getting user with ID {user_id}")
u = await user.get_user_by_tg_id(db_session, user_id)
if u is None:
self._logger.debug(f"User with ID {user_id} not found, creating...")
u = await user.create_user(db_session, telegram_id=user_id)
self._logger.debug(f"User provided, {u}")
# If user is authorized, lets setup `RTSU` client
if u.is_authorized:
rtsu_client.set_token(u.token)
self._logger.debug("User is authorized, API-client's token initialized.")
data["user"] = u
return data
async def on_pre_process_message(self, message: types.message, data: dict):
"""
Method for preprocessing messages (provides user)
:param message: A message
:param data: A data from another middleware
:return: None
"""
return await self._provide_user(message.from_user.id, data)
async def on_pre_process_callback_query(self, query: types.CallbackQuery, data: dict):
"""
Method for preprocessing callback-queries (provides user)
:param data:
:param query:
:return:
"""
return await self._provide_user(query.from_user.id, data)
Конкретно тут, мы просто создаем пользователя если на нашли его в бд, помимо того, если пользователь авторизован (токен есть в базе данных) мы также инициализируем токенAPI-клиента.
Теперь у нас есть почти всё что нужно для обработки сообщений.
Шаблонизатор Jinja2
Хорошо бы выделить "шаблоны" куда-то, собственно, я решил воспользоватся шаблонизатором "Нинздя"
Для этих целей я создал файлик template_engine.py
from typing import Optional, Any, Dict
from jinja2 import Environment, PackageLoader, select_autoescape
env = Environment(
loader=PackageLoader('rtsu_students_bot', 'templates'),
autoescape=select_autoescape(['html'])
)
def render_template(name: str, values: Optional[Dict[str, Any]] = None, **kwargs):
"""
Renders template & returns text
:param name: Name of template
:param values: Values for template (optional)
:param kwargs: Keyword-arguments for template (high-priority)
"""
template = env.get_template(name)
if values:
rendered_template = template.render(values, **kwargs)
else:
rendered_template = template.render(**kwargs)
return rendered_template
Тут я инициализировал шаблонизатор и сделал функцию render_template
которая просто рендерит шаблон и возвращает готовый текст.
Шаблоны
Для них я сделал отдельную директорию в проекте, там ничего интересного я думаю вы не найдете.
Вот например шаблон для отправки профиля студента.
<b>???? Профиль</b>
<b>???? Полное имя: <em> {{ profile.full_name.ru }} </em></b>
<b>⚙️ ID Студента: <code>{{ profile.id }}</code></b>
<b>???? Факультет: <em>{{ profile.faculty.ru }} </em></b>
<b>ℹ️ Курс: <code>{{ profile.course }}</code></b>
<b>⏳ Период обучения: {{ profile.training_period }}{% if profile.training_period < 5 %} года {% else %} лет {% endif %}</b>
<b>???? Год поступления: {{ profile.entrance_year }}</b>
<b>???? Степень образования: {{ profile.level }} </b>
В общем, как-то так.
Бежим писать хендлеры
В core.py
я положил общую функциональность которая может присутствовать в нескольких хендлерах, чтобы собственно соответсовать DRY
"""
`core.py` - Core-functionality of bot
"""
from typing import Union, List
from aiogram import types
from aiogram.dispatcher import FSMContext
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.service import user
from rtsu_students_bot.rtsu import RTSUApi, exceptions, schemas
from rtsu_students_bot.bot.keyboards import inline, reply
from rtsu_students_bot.bot.states import AuthState
from rtsu_students_bot.models import User
from rtsu_students_bot.template_engine import render_template
async def start_auth(
update: Union[types.CallbackQuery, types.Message],
user_in_db: User
):
"""
Core-function, starts authentification process
:param update: An `update` (message or query)
:param user_in_db: An User in database
:return:
"""
markup = None
text = render_template(
"auth.html",
user=user_in_db
)
if not user_in_db.is_authorized:
await AuthState.first()
markup = inline.cancellation_keyboard_factory()
await update.bot.send_message(
update.from_user.id,
text=text,
reply_markup=markup
)
async def show_profile(
message: types.Message,
rtsu_client: RTSUApi,
user_in_db: User,
):
"""
Shows information about profile
:param message:
:param rtsu_client:
:param user_in_db:
:return:
"""
profile = await rtsu_client.get_profile()
text = render_template(
"profile.html",
profile=profile,
user=user_in_db,
telegram_user=message.from_user
)
await message.bot.send_message(
message.from_user.id,
text,
reply_markup=inline.message_hiding_keyboard()
)
async def show_statistics(
message: types.Message,
rtsu_client: RTSUApi,
):
"""
Shows user's statistics
:param message: A message
:param rtsu_client: Initialized RTSU API client
:return:
"""
current_year_id = await rtsu_client.get_current_year_id()
subjects: List[schemas.Subject] = await rtsu_client.get_academic_year_subjects(current_year_id)
await message.bot.send_message(
message.chat.id,
text=render_template("statistics.html", subjects=subjects),
reply_markup=inline.message_hiding_keyboard()
)
async def authorize_user(
update: Union[types.CallbackQuery, types.Message],
user_in_db: User,
login: str,
password: str,
db_session: AsyncSession,
rtsu_client: RTSUApi,
state: FSMContext
):
"""
Authorizes user, on success auth, saves token to database
:param state: A state (fsm-context)
:param rtsu_client: An initialized RTSU api client
:param db_session: `AsyncSession` object
:param password: A password
:param login: A login
:param user_in_db: A user in database
:param update: Update (message or query)
"""
try:
auth_schema = await rtsu_client.auth(login, password)
except exceptions.AuthError:
await update.bot.send_message(
update.from_user.id,
text=render_template("auth_error.html"),
reply_markup=inline.cancellation_keyboard_factory()
)
await AuthState.first()
return
profile = await rtsu_client.get_profile()
await user.update_user(
db_session,
user_in_db.id,
full_name=profile.full_name.ru,
)
await user.update_user_token(
db_session,
update.from_user.id,
auth_schema.token,
)
await update.bot.send_message(
update.from_user.id,
text=render_template("auth_success.html", full_name=profile.full_name.ru),
reply_markup=reply.main_menu_factory()
)
await state.finish()
async def show_subjects(
message: types.Message,
rtsu_client: RTSUApi
):
"""
Shows user's subjects
:param message:
:param rtsu_client:
:return:
"""
current_year = await rtsu_client.get_current_year_id()
subjects = await rtsu_client.get_academic_year_subjects(current_year)
await message.bot.send_message(
message.chat.id,
text="???? Дисциплины",
reply_markup=inline.subjects_keyboard_factory(subjects)
)
async def logout_user(
message: types.Message,
user_in_db: User,
db_session: AsyncSession
):
"""
Sets user's token to `NULL`
:param db_session: `AsyncSession` object
:param message: A message
:param user_in_db: A user in db
:return:
"""
await user.update_user_token(
db_session,
user_in_db.telegram_id,
token=None
)
await message.bot.send_message(
message.from_user.id,
text=render_template("logout.html", user=user_in_db),
reply_markup=inline.auth_keyboard_factory()
)
async def show_help(
message: types.Message
):
"""
Shows help-menu
:param message: A message
:return:
"""
await message.bot.send_message(
message.from_user.id,
text=render_template("help.html"),
reply_markup=inline.message_hiding_keyboard()
)
async def show_about(
message: types.Message
):
"""
Shows about-menu
:param message: A message
:return:
"""
await message.bot.send_message(
message.from_user.id,
text=render_template("about.html"),
disable_web_page_preview=True,
reply_markup=inline.message_hiding_keyboard()
)
Тут вам и стейт-машина ну и клавиатурки собственно тоже есть.
Давайте заглянем непосредственно в хендлеры, в частности в commands.py
from aiogram import types, Dispatcher
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.rtsu import RTSUApi
from rtsu_students_bot.bot.filters import AuthorizationFilter
from rtsu_students_bot.models import User
from rtsu_students_bot.bot.keyboards import inline, reply
from rtsu_students_bot.template_engine import render_template
from . import core
async def start(message: types.Message, user: User):
"""
Handles `/start` cmd
:param user:
:param message: A message
"""
markup = reply.main_menu_factory()
if not user.is_authorized:
markup = inline.auth_keyboard_factory()
await message.reply(
text=render_template(
"start.html",
user=user,
telegram_user=message.from_user
),
reply_markup=markup,
)
async def auth(message: types.Message, user: User):
"""
Handles `/auth` cmd
:param user: A User
:param message: A message
"""
await core.start_auth(message, user)
async def help_cmd(message: types.Message):
"""
Handles `help` cmd
:param message: A message
"""
await core.show_help(message)
async def statistics(message: types.Message, rtsu: RTSUApi):
"""
Handles `statistics` cmd
:param message: A message
:param rtsu: Initialized RTSU API client
"""
await core.show_statistics(message, rtsu)
async def subjects(message: types.Message, rtsu: RTSUApi):
"""
Handles `subjects` cmd
:param message: A message
:param rtsu: Initialized RTSU API client
"""
await core.show_subjects(message, rtsu)
async def profile(message: types.Message, rtsu: RTSUApi, user: User):
"""
Handles `profile` cmd
:param message: A message
:param user: User in db
:param rtsu: Initialized RTSU API client
"""
await core.show_profile(message, rtsu, user)
async def logout(message: types.Message, user: User, db_session: AsyncSession):
"""
Handles `logout` cmd
:param db_session: `AsyncSession` object
:param user: A user in db
:param message: A message
"""
await core.logout_user(message, user, db_session)
async def about(message: types.Message):
"""
Handles `about` cmd
:param message: A message
"""
await core.show_about(message)
def setup(dp: Dispatcher):
"""
Setups commands-handlers
:param dp:
:return:
"""
dp.register_message_handler(start, commands=["start"])
dp.register_message_handler(help_cmd, commands=["help"])
dp.register_message_handler(about, commands=["about"])
dp.register_message_handler(logout, AuthorizationFilter(True), commands=["logout"])
dp.register_message_handler(profile, AuthorizationFilter(True), commands=["profile"])
dp.register_message_handler(subjects, AuthorizationFilter(True), commands=["subjects"])
dp.register_message_handler(statistics, AuthorizationFilter(True), commands=["stat"])
dp.register_message_handler(auth, AuthorizationFilter(False), commands=["auth"])
dp.register_message_handler(auth, AuthorizationFilter(authorized=True), commands=["auth"])
Тут я просто регистрирую команды и дёргаю всё что мне нужно из core.py
Обратите внимание на AuthorizationFilter
, он проверяет наличие авторизации.
Сразу возникает логичный вопрос, а почему я не использовал BoundFilter ?
Ответ кроется в том, что он багнутый, в начале я его и использовал, однако он работал абсолютно некорректно, я где-то час бился с ним но в итоге решил переписать фильтр используя обычный AbstractFilter
он заработал сразу же, с первого раза.
Сообщать об этом разработчикам Aiogram - я не особо и имею желания, они в последнее время очень недружелюбны и очень странно относятся к русскоговорящим.
Меня например в чате aiogram тупо забанили :/
Собственно, привожу filters/auth.py
и содержимое AuthorizationFilter
from typing import Union
from aiogram.dispatcher.filters import Filter
from aiogram.dispatcher.handler import ctx_data
from aiogram import types
from rtsu_students_bot.models import User
from rtsu_students_bot.template_engine import render_template
class AuthorizationFilter(Filter):
"""
Filter for checking user's authorization
"""
def __init__(self, authorized: bool):
"""
Initializes self
:param authorized:Is admin?
"""
self.authorized = authorized
async def check(self, message: Union[types.Message, types.CallbackQuery]):
"""
Checks for user's authorization status
:param message: A message
"""
data = ctx_data.get()
user: User = data.get("user")
if self.authorized is None:
return True
if self.authorized and not user.is_authorized:
await message.bot.send_message(
message.from_user.id,
text=render_template("not_authorized.html")
)
return False
elif not self.authorized and user.is_authorized:
await message.bot.send_message(
message.from_user.id,
text=render_template("already_authorized.html")
)
return False
return True
Тут ничего сложного, я просто забираю данные из мидлваря, ну и если юзеру нужна авторизация но её - нет, я прерываю обработку апдейта вернув False
, ну или наоборот если юзеру не нужна авторизация но она у него есть, я также прерываю обработку апдейта ну и не забываю выдавать ему сообщение, ничего такого.
Давайте взглянем на text.py
и обработчики текста
"""
`text.py` - Text handlers
"""
from aiogram import types, Dispatcher
from aiogram.dispatcher import FSMContext
from aiogram.dispatcher.filters import Text
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.bot.filters import AuthorizationFilter
from rtsu_students_bot.rtsu import RTSUApi
from rtsu_students_bot.models import User
from rtsu_students_bot.bot.states import AuthState
from rtsu_students_bot.template_engine import render_template
from rtsu_students_bot.bot.keyboards import inline
from . import core
async def login_handler(
message: types.Message,
state: FSMContext
):
"""
Handles `login` of user
:param message: A message with login
:param state: A current state (fsm-context)
"""
async with state.proxy() as data:
data["login"] = message.text
await AuthState.next()
await message.delete()
await message.bot.send_message(
message.from_user.id,
render_template("enter_password.html"),
reply_markup=inline.cancellation_keyboard_factory()
)
async def password_handler(
message: types.Message,
state: FSMContext,
):
"""
Handles password of user
:param message: A message (with password)
:param state: A state (fsm-context)
"""
async with state.proxy() as data:
password = data["password"] = message.text
login = data["login"]
await AuthState.next()
await message.delete()
await message.bot.send_message(
message.from_user.id,
render_template("credentials_confirmation.html", login=login, password=password),
reply_markup=inline.confirmation_keyboard_factory()
)
async def show_profile_handler(
message: types.Message,
rtsu: RTSUApi,
user: User,
):
"""
Handles 'Show profile' request
:param message: A message
:param rtsu: Initialized RTSU API client
:param user: A user from db
:return:
"""
await core.show_profile(message, rtsu, user)
async def show_statistics_handler(
message: types.Message,
rtsu: RTSUApi,
):
"""
Shows user's statistics
:param message:
:param rtsu:
:return:
"""
await core.show_statistics(message, rtsu)
async def show_subjects_handler(
message: types.Message,
rtsu: RTSUApi
):
"""
Handles 'Show statistics' request
:param rtsu: Initialized RTSU API client
:param message: A message
:return:
"""
await core.show_subjects(message, rtsu)
async def logout_handler(
message: types.Message,
user: User,
db_session: AsyncSession,
):
"""
Handles 'logout-request'
:param db_session: `AsyncSession` object
:param message: A message
:param user: A user in db
"""
await core.logout_user(message, user, db_session)
async def auth_handler(message: types.Message, user: User):
"""
Handles 'auth-request'
:param message: A message
:param user: A user in db
:return:
"""
await core.start_auth(message, user)
async def help_handler(message: types.Message):
"""
Handles 'help-request'
:param message: A message
:return:
"""
await core.show_help(message)
async def about_handler(message: types.Message):
"""
Handles 'about-request'
:param message: A message
:return:
"""
await core.show_about(message)
def setup(dp: Dispatcher):
"""
Setups text-handlers
:param dp: A `Dispatcher` instance
"""
dp.register_message_handler(
login_handler,
state=AuthState.login,
content_types=[types.ContentType.TEXT],
)
dp.register_message_handler(password_handler, state=AuthState.password, content_types=[types.ContentType.TEXT])
dp.register_message_handler(
show_profile_handler, Text(equals="???? Профиль"), AuthorizationFilter(authorized=True)
)
dp.register_message_handler(
show_statistics_handler, Text(equals="???? Статистика"), AuthorizationFilter(authorized=True)
)
dp.register_message_handler(
show_subjects_handler, Text(equals="???? Дисциплины"), AuthorizationFilter(authorized=True)
)
dp.register_message_handler(
logout_handler, Text(equals="◀️ Выход из системы"), AuthorizationFilter(authorized=True)
)
dp.register_message_handler(
auth_handler, Text(equals="???? Авторизация"), AuthorizationFilter(authorized=False)
)
dp.register_message_handler(
help_handler, Text(equals="???? Инструкция"),
)
dp.register_message_handler(
about_handler, Text(equals="ℹ️ О боте"),
)
Тут мы обрабатываем кнопки главной менюшки и также дёргаем core.py
для необходимого функционала, ну и помимо этого тут обрабатываются логин/пароль
Конечно, лучше бы выделить текста кнопок куда-то в константы, чем я и займусь скоро.
Теперь взглянем на CallbackQuery-handlers
from aiogram import types, Dispatcher
from aiogram.dispatcher import FSMContext
from sqlalchemy.ext.asyncio import AsyncSession
from rtsu_students_bot.bot.filters import AuthorizationFilter
from rtsu_students_bot.rtsu import RTSUApi
from rtsu_students_bot.template_engine import render_template
from rtsu_students_bot.models import User
from rtsu_students_bot.bot.keyboards import callbacks, inline
from rtsu_students_bot.bot.states import AuthState
from .core import start_auth, authorize_user
async def auth_callback_processor(
query: types.CallbackQuery,
user: User
):
"""
Handles `callbacks.AUTH_CALLBACK`
:param query: A callback-query
:param user: An User
"""
await start_auth(query, user)
async def cancel_callback_processor(
query: types.CallbackQuery,
state: FSMContext
):
"""
Handles `callbacks.CANCELLATION_CALLBACK`
:param state: A current state (fsm-context)
:param query:
:return:
"""
await query.message.delete()
await query.bot.send_message(
query.from_user.id,
text=render_template("cancellation.html")
)
await state.finish()
async def credentials_confirmation_callback_processor(
query: types.CallbackQuery,
callback_data: dict,
db_session: AsyncSession,
user: User,
rtsu: RTSUApi,
state: FSMContext
):
"""
Processes `callbacks.CONFIRMATION_CALLBACK`
:param state: A current state (fsm-context)
:param rtsu: An initialized rtsu-api client
:param user: A user in database
:param db_session: `AsyncSession` object
:param query: A callback-query
:param callback_data: Callback's data
"""
# If user clicks `Yes` - `1` will be passed
# If user clicks `No` - `0` will be passed
# So, all data will be represented as strings in telegram-callbacks
# For getting boolean some converting needed
# Firstly, we convert string to int, after, we convert this int to boolean
ok = bool(int(callback_data.get("ok")))
await query.answer()
await query.message.delete()
async with state.proxy() as data:
login = data.get("login")
password = data.get("password")
if ok:
await authorize_user(query, user, login, password, db_session, rtsu, state)
else:
await query.bot.send_message(
query.from_user.id,
text=render_template("auth.html", user=user),
reply_markup=inline.cancellation_keyboard_factory()
)
await AuthState.first()
async def show_subject_processor(
query: types.CallbackQuery,
rtsu: RTSUApi,
callback_data: dict,
):
"""
Handles `callbacks.SUBJECT_CALLBACK`
:param callback_data: A callback-data
:param query: A query
:param rtsu: Initialized RTSU API client
"""
await query.answer()
needed_subject_id = int(callback_data.get("id"))
year = await rtsu.get_current_year_id()
subjects = await rtsu.get_academic_year_subjects(year)
needed_subject = list(filter(lambda x: x.id == needed_subject_id, subjects))
if not needed_subject:
await query.bot.send_message(
query.from_user.id,
"Дисциплина не найдена."
)
return
await query.bot.send_message(
query.from_user.id,
text=render_template(
"subject.html",
subject=needed_subject[0]
)
)
async def delete_message_callback_processor(query: types.CallbackQuery):
"""
Processes deletion-callback
:param query: A callback-query
:return:
"""
await query.answer()
await query.message.delete()
def setup(dp: Dispatcher):
"""
Registers callback-query handlers
:param dp: A `Dispatcher` instance
"""
dp.register_callback_query_handler(
auth_callback_processor, callbacks.AUTH_CALLBACK.filter(), AuthorizationFilter(False)
)
dp.register_callback_query_handler(cancel_callback_processor, callbacks.CANCELLATION_CALLBACK.filter())
dp.register_callback_query_handler(
credentials_confirmation_callback_processor, callbacks.CONFIRMATION_CALLBACK.filter(), state=AuthState.confirm
)
dp.register_callback_query_handler(
show_subject_processor, callbacks.SUBJECT_CALLBACK.filter(), AuthorizationFilter(True)
)
dp.register_callback_query_handler(
delete_message_callback_processor, callbacks.DELETE_MSG_CALLBACK.filter()
)
Тут также дёргается некоторый функционал из core.py но помимо этого, есть и обработчики которые по сути самостоятельные и выдают какую-то информацию.
Тут вам и сабмиты логинов/паролей и выдача отчёта по конкретной дисциплине ну и так далее всё, что связано с inline-клавиатурой.
В целом, есть еще и обработчики ошибок, вот они
import logging
from aiogram import Dispatcher, types
from aiogram.utils.exceptions import InvalidQueryID, MessageNotModified
from rtsu_students_bot.rtsu.exceptions import ServerError
from rtsu_students_bot.template_engine import render_template
async def invalid_query_error_handler(update, error):
"""
Handles `InvalidQueryError`
:param update:
:param error:
:return:
"""
logging.info(f"OK, Invalid query ID, {error}, {update}")
return True
async def message_not_modified_error_handler(update, error):
"""
Handles `MessageNotModifiedError`
:param update:
:param error:
:return:
"""
logging.info(f"OK, Message not modified, {error}, {update}")
return True
async def server_error_handler(update: types.Update, error: ServerError):
"""
Handles `ServerError`
:param update:
:param error:
:return:
"""
logging.exception("Server error", exc_info=error)
if update.message:
chat_id = update.message.from_user.id
else:
chat_id = update.callback_query.from_user.id
await update.bot.send_message(
chat_id=chat_id,
text=render_template("server_error.html")
)
return True
async def any_exception_handler(update: types.Update, error: Exception):
"""
Handles `Exception`
:param update:
:param error:
:return:
"""
logging.error(f"{error.__class__.__name__} has been thrown")
logging.exception("Exception", exc_info=error)
return True
def setup(dp: Dispatcher):
"""
Registers error handlers
:param dp: A `Dispatcher` instance
"""
dp.register_errors_handler(
server_error_handler,
exception=ServerError
)
dp.register_errors_handler(
invalid_query_error_handler,
exception=InvalidQueryID
)
dp.register_errors_handler(
message_not_modified_error_handler,
exception=MessageNotModified
)
dp.register_errors_handler(
any_exception_handler,
exception=Exception
)
Тут мы просто обрабатываем и пропускаем некоторые ошибки, а об ServerError
мы сообщаем пользователю.
Думаю тоже логично.
Теперь взглянем на главный файл в пакете bot
, а именно app.py
from aiogram import Bot, Dispatcher
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from rtsu_students_bot.config import settings
from . import handlers, middlewares
def get_app() -> Dispatcher:
"""
Initializes & returns `Dispatcher`
"""
# Create bot & dispatcher
memory_storage = MemoryStorage()
bot = Bot(settings.bot.token, parse_mode="html")
dp = Dispatcher(bot, storage=memory_storage)
# Setup handlers
handlers.setup(dp)
# Setup middlewares
middlewares.setup(dp)
return dp
Тут мы просто создаем объекты диспетчера и бота, регистрируем обработчики, мидлвари ну и задаем storage для нашего FSM
Да, у меня пока что всё хранится в MemoryStorage
, возможно позже сделаю RedisStorage
, увидим.
Ну и на последок, cli.py
& config.py
from aiogram import executor
from typer import Typer, Option
from .config import settings
from .bot import get_app, handlers
typer_app = Typer()
@typer_app.command()
def start(
skip_updates: bool = Option(default=False, help="Skip telegram updates on start?"),
use_webhook: bool = Option(default=False, help="Use webhook for receiving updates?")
):
"""
Starts bot
:param skip_updates: Skip telegram updates on start?
:param use_webhook: Use webhook mode for receiving updates?
"""
# Build bot
dp = get_app()
# Build startup-handler
startup_handler = handlers.startup.startup_handler_factory()
if use_webhook:
# Check for `webhook` settings are not `None`
if settings.webhooks is None:
print("Please, fill webhook's settings.")
exit(-1)
startup_handler = handlers.startup.startup_handler_factory(f"{settings.webhooks.host}{settings.webhooks.path}")
executor.start_webhook(
dispatcher=dp,
on_startup=startup_handler,
skip_updates=skip_updates,
host=settings.webhooks.webapp_host,
port=settings.webhooks.webapp_port,
webhook_path=settings.webhooks.path,
check_ip=True
)
else:
executor.start_polling(
dp,
skip_updates=skip_updates,
on_startup=startup_handler
)
Тут у меня просто используется Typer для обработки аргументов командной строки, в целом тут я просто подгружаю конфиг и запускаю бота, startup-хендлеры я думаю вы посмотрите уже сами, если интересно, там просто настройка логов, бд, кеша и всего остального, вряд-ли вам это покажется интересным.
Ну и собственно config.py
from typing import Optional
from pydantic import BaseSettings
from .constants import DEFAULT_ENCODING, SETTINGS_FILE
class DatabaseSettings(BaseSettings):
"""Settings of database"""
url: str
class BotSettings(BaseSettings):
"""Settings of telegram-bot"""
token: str
class Logging(BaseSettings):
format: str
debug: bool
class Webhooks(BaseSettings):
host: str
path: str
webapp_host: str
webapp_port: int
class Settings(BaseSettings):
"""Class for settings"""
bot: BotSettings
logging: Logging
db: DatabaseSettings
webhooks: Optional[Webhooks] = None
settings = Settings.parse_file(
path=SETTINGS_FILE,
encoding=DEFAULT_ENCODING
)
Тут тоже ничего такого нет, использую обычные настройки из Pydantic и гружу их в settings
Модули в питоне кешируются, поэтому такой вид конфигурации считаю в принципе справедливым =)
Ну и собственно, скриншоты получившегося бота
Собственно, на этом думаю всё, все ссылки прикрепляю.
Использованные библиотеки/фреймворки
Aiogram
Pydantic
Aiohttp
Cashews
SQLAlchemy
Poetry
Typer
Jinja2
Спасибо что прочитали до конца, ожидаю объективной критики и предложений по улучшению качества кода для своего же развития.
Комментарии (9)
inhell
00.00.0000 00:00+4Используйте спойлеры, не всем удобно скролить ,)
зы название универа огонь ,)
razoryoutub
00.00.0000 00:00+1Код выглядит взрослым для первого курса, достаточно чистый. Статья немного перегружена этим самым кодом, лучше использовать спойлеры.
Также, может не увидел, но рекомендовал бы сделать средство сбора и просмотра (для админ(а/ов) естественно) статистики использования бота. Просто может так оказаться, что пользователей много, а реальных использования 2-3 в день)
persii
Читать очень понятно и очень приятно. Спасибо! Очень раннему начинающему вроде меня много чего есть взять на заметку.
Awaitable Автор
Вам спасибо =)