Привет! Меня зовут Александр, я руководитель бэкенд-разработки в KTS.
Сегодня я покажу, как написать своего Телеграм-бота на основе asyncio и aiohttp. Мы не будем использовать ни aiogram, ни любые другие библиотеки, а напишем бота с нуля, чтобы немного познакомиться с асинхронным программированием, корутинами и некоторыми примитивами синхронизации.
Что будет в статье:
-
Реализация схемы:
Основы ботоведения
Бот — сущность в мессенджере.
Они есть практически везде: в Телеграме, Инстаграме, ВК. Это программа, которая на основе определенных алгоритмов отвечает на сообщения пользователей. Задачи и направления самые разные: бот может просто присылать сообщения «С добрым утром!» А может, например, управлять участниками внутри чата.
Перед началом работы с ботом его нужно создать и познакомиться с Telegram API.
Шаг 1. Сначала создаем бота с помощью BotFather внутри Телеграма:
https://core.telegram.org/bots#6-botfather
После создания вы получите сообщение:
Done! Congratulations on your new bot...
Use this token to access the HTTP API:
<token>
...
В сообщении будет токен, который нужно использовать для запросов в Телеграм.
Шаг 2. Выполните первый запрос к Telegram API по инструкции: https://core.telegram.org/bots/api#authorizing-your-bot
В ответ вы должны получить что-то подобное:
{"ok":true,"result":{"id":2065163148,"is_bot":true,"first_name":"Metaclass","username":"metaclassbot","can_join_groups":true,"can_read_all_group_messages":false,"supports_inline_queries":false}}
Шаг 3. Далее нужно научиться получать уведомления из Телеграма. Существует 2 способа:
webhook — инициатором запроса выступает Телеграм.
Когда пользователь пишет боту, Телеграм делает запрос на URL, который вы установите с помощью метода setwebhook. У этого метода есть недостатки: его трудно отлаживать, так как должен быть публичный адрес бота. Также на каждое сообщение выполняется HTTP-запрос, и при 1000+ сообщений в секунду серверы не справятся.long polling — инициатором является ваше приложение.
Оно обращается к Telegram API и получает уведомления или ожидает, если уведомлений нет — отсюда название long.
Мы для получения уведомлений будем использовать long polling. Описание метода.
Выполните в браузере такой запрос:
https://api.telegram.org/bot<token>/getUpdates
Он завершится моментально и вернет:{"ok":true,"result":[]}
-
Теперь выполните другой запрос:
https://api.telegram.org/bot<token>/getUpdates?timeout=30
Он будет висеть 30 секунд и, если ничего не написать боту, вернет:
{"ok":true,"result":[]}
Если написать боту хоть что-то, он моментально вернет Update.
Шаг 4. Теперь давайте научимся отправлять сообщение пользователю в ответ. Сначала отправим сообщение боту, для чего используем метод sendMessage. Метод принимает два обязательных параметра:
chat_id
— поле, которое пришло в объекте Update. Может быть как id персонального чата, так и id группового чата.text
Выполните в браузере запрос. chat_id
нужно получить из предыдущего запроса: https://api.telegram.org/bot<token>/sendMessage?chat_id=85364161&text=hello.
В результате бот напишет вам сообщение hello.
Для работы с методами Telegram API будем использовать класс TgClient.
Echo-бот
Для начала давайте получим сообщения из Telegram. Для этого нужно вызвать метод get_updates
класса TgClient
:
import asyncio
import os
from clients.tg import TgClient
async def run_echo():
c = TgClient(os.getenv("BOT_TOKEN"))
print(await c.get_updates(offset=0, timeout=5))
if __name__ == "__main__":
asyncio.run(run_echo())
Обратите внимание, что код получает токен бота из переменной окружения, поэтому перед запуском нужно установить значение переменной BOT_TOKEN
.
При запуске кода может быть два исхода:
Если боту отправляли сообщение не более 24 часов назад, метод
get_updates
вернет все последние сообщения. Чтобы было проще, далее по тексту я буду использовать термин «новые сообщения», а не «новые объекты update».
{'ok': True, 'result': [{'update_id': 503972397, 'message': {'message_id': 335, 'from': {'id': 85364161, 'is_bot': False, 'first_name': 'Alexander', 'last_name': 'Opryshko', 'username': 'alexopryshko', 'language_code': 'en'}, 'chat': {'id': 85364161, 'first_name': 'Alexander', 'last_name': 'Opryshko', 'username': 'alexopryshko', 'type': 'private'}, 'date': 1644871206, 'text': '1'}}]}
-
Если боту не писали вообще, или писали давно, он зависнет на 5 секунд (timeout=5) и вернет пустой список:
{'ok': True, 'result': []}
при этом, если боту написать во время ожидания, он моментально вернет результат, как и в первом случае.
Чтобы получать сообщения из Телеграма постоянно, в цикле нужно вызывать get_updates
, так как он завершается сразу после написания боту нового сообщения.
async def run_echo():
c = TgClient(os.getenv("BOT_TOKEN"))
while True:
print(await c.get_updates(offset=0, timeout=60))
Но если запустить такой код, мы будем получать последние сообщения из Телеграма в бесконечном цикле. А чтобы получать только новые, нужно использовать параметр offset
.
async def run_echo():
c = TgClient(os.getenv("BOT_TOKEN"))
offset = 0
while True:
res = await c.get_updates(offset=offset, timeout=60)
for item in res["result"]:
offset = item["update_id"] + 1
print(item)
Правило Телеграма: после получения новой пачки сообщений нужно взять из ответа параметр update_id
и следующий запрос выполнять с offset
на единицу больше, чем последнее сообщение, пришедшее из get_updates
.
Теперь можно завершить написание полноценного echo-бота:
async def run_echo():
c = TgClient(os.getenv("BOT_TOKEN"))
offset = 0
while True:
res = await c.get_updates_in_objects(offset=offset, timeout=60)
for item in res.result:
offset = item.update_id + 1
await c.send_message(item.message.chat.id, item.message.text)
В функции заменили get_updates
на get_updates_in_objects
, потому что гораздо удобнее оперировать объектами, чем словарями.
Текущая реализация имеет большой недостаток — бот не работает параллельно: после получения обновления он сразу начинает выполнять бизнес-логику бота. В нашем случае он отправляет echo-сообщение, и в это время новые сообщения от бота получаться не будут. Получается, другие пользователи простаивают. Нужно как-то организовать параллельную обработку пользователей и обеспечить возможность масштабирования.
Поэтому мы пойдем другим путем.
Архитектура бота
Введем сущность poller. Он будет получать сообщения из Телеграма и ставить их в очередь, никакую бизнес-логику он не реализует. Он должен быть в единственном экземпляре.
Введем сущность worker. Он будет выполнять все рабочие задачи. worker берет задачу из очереди и каким-то образом выполняет ее.
Сущностей worker может быть много:
Такая схема лучше изначальной по двум причинам:
при возрастании нагрузки мы можем соответственно увеличить количество worker;
сообщения от пользователей обрабатываются параллельно.
Реализация схемы
poller.py
import asyncio
from clients.tg import TgClient
class Poller:
def __init__(self, token: str, queue: asyncio.Queue):
self.tg_client = TgClient(token)
self.queue = queue
async def _worker(self):
offset = 0
while True:
res = await self.tg_client.get_updates_in_objects(offset=offset, timeout=60)
for u in res.result:
offset = u.update_id + 1
print(u)
self.queue.put_nowait(u)
async def start(self):
asyncio.create_task(self._worker())
poller в точности повторяет логику echo-бота, за исключением отправки echo-сообщения:
работает бесконечно;
получает уведомления из Телеграма;
кладет сообщения в очередь.
Логика получения уведомлений описана в методе _worker
, для запуска получения уведомлений нужно запустить именно его. Но просто вызвать await self._worker()
не получится, потому что мы заблокируем основной поток выполнения, а нам еще нужно запустить worker, который будет вычитывать сообщения из очереди. Поэтому нужно запустить фоновую задачу с помощью asyncio.create_task.
Теперь рассмотрим, как запустить poller.
import asyncio
import os
from bot.poller import Poller
async def start():
q = asyncio.Queue()
poller = Poller(os.getenv("BOT_TOKEN"), q)
await poller.start()
def run():
loop = asyncio.get_event_loop()
try:
print('bot has been started')
loop.create_task(start())
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
run()
Так как бот должен работать бесконечно, то необходимо организовать бесконечный цикл. В echo-боте мы просто оставили while True
, но в текущей реализации так сделать будет неудобно, поэтому лучше использовать метод запуска run_forever
, предварительно положив все необходимые задачи в event loop с помощью метода create_task.
Документация по методу run_forever
Документация по методу create_task
Обратите внимание:
1. В коде poller используется
asyncio.create_task
, а при его запуске используетсяloop.create_task
.
Отличие заключается в том, что мы явно указали, какойevent loop
нужно использовать в синхронной функцииdef run
. В асинхронных функцияхasync def loop
явно можно не указывать, потому что Python сам знает текущий цикл событий и прикрепляет задачу к нему. Функция, из которой запускаетсяcreate_task,
тоже запущена в этомloop
2. Если в запущенной в фоне корутине
create_task
происходит исключение, мы можем не увидеть его сразу, только после остановкиevent loop
. Из-за этого могут возникнуть сложности в нахождении ошибок. Почитать подробнее в этой статье.
worker.py
import asyncio
from clients.tg import TgClient
from clients.tg.dcs import UpdateObj
class Worker:
def __init__(self, token: str, queue: asyncio.Queue, concurrent_workers: int):
self.tg_client = TgClient(token)
self.queue = queue
self.concurrent_workers = concurrent_workers
async def handle_update(self, upd: UpdateObj):
print("before", upd)
await asyncio.sleep(1)
print("after", upd)
async def _worker(self):
while True:
upd = await self.queue.get()
await self.handle_update(upd)
async def start(self):
for _ in range(self.concurrent_workers):
asyncio.create_task(self._worker())
У worker есть несколько кардинальных отличий от poller:
для обеспечения параллельной обработки входящих сообщений запускается несколько
_worker
, а количество параллельных воркеров регулируется параметромconcurrent_workers
;новые сообщения приходят не из Telegram, а из очереди, которую предварительно заполнил poller;
внутри
handle_update
— который запускается при появлении нового сообщения в очереди — реализуется бизнес-логика обработки сообщения, т.е. бизнес-логика бота.
Теперь добавим запуск worker в корутину async def start.
Обратите внимание, что очередь должна быть общая между poller и worker:
async def start():
q = asyncio.Queue()
poller = Poller(os.getenv("BOT_TOKEN"), q)
await poller.start()
worker = Worker(os.getenv("BOT_TOKEN"), q, 2)
await worker.start()
На этом минимальная реализация бота готова, но есть нюансы, которые стоит улучшить:
в корутине
start
мы оперируем внутренними компонентами бота: очередью, poller, worker. Было бы хорошо иметь сущность Bot с одним методом “start”;при остановке бота он завершается моментально, не дожидаясь выполнения запущенной логики бота. Поэтому может быть такое, что мы прервем пользовательский сценарий на середине.
Class Bot
Всю работу с компонентами бота вынесем в отдельный класс Bot:
import asyncio
from bot.poller import Poller
from bot.worker import Worker
class Bot:
def __init__(self, token: str, n: int):
self.queue = asyncio.Queue()
self.poller = Poller(token, self.queue)
self.worker = Worker(token, self.queue, n)
async def start(self):
await self.poller.start()
await self.worker.start()
И перепишем функцию run
:
import asyncio
import os
from bot.base import Bot
def run():
loop = asyncio.get_event_loop()
bot = Bot(os.getenv("BOT_TOKEN"), 2)
try:
print('bot has been started')
loop.create_task(bot.start())
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
run()
Код запуска стал чище, теперь не нужно думать про внутренние компоненты бота. Достаточно запустить bot.start(), и бот начнет функционировать
Остановка бота
Сейчас остановка происходит при нажатии Ctrl + C. Возникает исключение KeyboardInterrupt
, которое мы ловим и молча завершаем работу бота:
try:
print('bot has been started')
loop.create_task(bot.start())
loop.run_forever()
except KeyboardInterrupt:
pass
Почему нужно делать красивое завершение (graceful shutdown):
1. Бизнес-логика бота может прерваться посередине, и для пользователя это будет выглядеть багом.
Пример: пользователь отправил файл боту, бот отправил сообщение, что файл загружается, и в этот момент его остановили. Бот будет загружать файл вечно, а пользователь останется в недоумении.
2. Если в боте есть подключения к другим компонентам, например, к базе данных или очереди, их нужно корректно завершать.
Поэтому введем функцию stop
в Bot и каждый внутренний компонент. Она будет отвечать за корректное завершение. Начнем с poller.
Чтобы отменить запущенную задачу, нужно вызвать у нее метод cancel
. https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
class Poller:
def __init__(self, token: str, queue: asyncio.Queue):
self.tg_client = TgClient(token)
self.queue = queue
self._task: Optional[Task] = None
async def _worker(self): ...
async def start(self):
self._task = asyncio.create_task(self._worker())
async def stop(self):
self._task.cancel()
Для этого введем переменную _task,
в которую сохраним объект созданной задачи, а в момент остановки poller вызовем cancel
.
В случае с worker нельзя просто взять и вызвать cancel
у всех запущенных задач:
нужно обработать все задачи, которые poller положил в очередь, иначе обновления из Телеграма просто потеряются;
нельзя прерывать обработку конкретной задачи, т.е. у запущенной задачи cancel вызвать нельзя.
Получается, нужно дождаться выполнения всех задач, которые находятся в очереди. Для такой задачи есть метод join
.
https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.join
class Worker:
def __init__(self, token: str, queue: asyncio.Queue, concurrent_workers: int):
...
self._tasks: List[asyncio.Task] = []
async def handle_update(self, upd: UpdateObj):
...
async def _worker(self):
while True:
try:
upd = await self.queue.get()
await self.handle_update(upd)
finally:
self.queue.task_done()
async def start(self):
self._tasks = [asyncio.create_task(self._worker()) for _ in range(self.concurrent_workers)]
async def stop(self):
await self.queue.join()
for t in self._tasks:
t.cancel()
Давайте разберемся, что мы сделали:
в методе
start
сохранили все запущенные задачи вself._tasks
;внутри метода
stop
перед вызовом методаcancel
у всех задач дождались, когда все задачи из очереди будут выполнены с помощьюawait self.queue.join()
;с помощью
self.queue.task_done()
помечаем «выполненными» задачи внутри метода_worker
.
В итоге получается:
остановили poller, новые задачи не добавляются в очередь.
внутри worker ждем, пока выполнятся все задачи
(self.queue.join())
, и только после этого вызываем отмену воркеров.так как все задачи завершились, а новые не поступают — poller остановлен — то можно вызывать
cancel
у задач и не бояться прервать бизнес-логику бота.
Теперь добавим в Bot метод stop
:
async def stop(self):
await self.poller.stop()
await self.worker.stop()
И после возникновения исключения KeyboardInterrupt
запустим остановку бота с помощью loop.run_until_complete.
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_until_complete
try:
print('bot has been started')
loop.create_task(bot.start())
loop.run_forever()
except KeyboardInterrupt:
print("\nstopping", datetime.datetime.now())
loop.run_until_complete(bot.stop())
print('bot has been stopped', datetime.datetime.now())
Чтобы проверить, что мы все сделали правильно, можно добавить asyncio.sleep(10)
в метод handle_update
, отправить боту сообщение и попробовать завершить бота раньше, чем через 10 секунд. В итоге должна получиться подобная картина:
Заключение
Цель этой статьи — показать на примере, как работать с базовым асинхронным программированием и примитивами синхронизации asyncio, а задача написания бота хорошо подходит для этих целей. Исходный код.
Написать такого бота — одно из домашних заданий курса «Асинхронное программирование» в нашей школе Metaclass.
11-го апреля у нас стартует третий поток курса. Если хотите разобраться в этой теме, записывайтесь по ссылке выше.
Чтобы быть в курсе новых потоков, вступайте в чат школы в Телеграме: https://t.me/kts_dev
Комментарии (7)
mixsture
15.03.2022 17:49+1сообщения от пользователей обрабатываются параллельно.
Каким образом? насколько я помню, для этого либо мультипроцессинг нужен, либо выпилить GIL. Асинхронность же — это совсем не параллельно.alexopryshko Автор
15.03.2022 18:01+1Когда выполняется io операция можно принять сообщение от другого пользователя и обработать его. А так как в боте много работы с API, то получается значительный прирост в скорости
mixsture
15.03.2022 19:57+6Да это я все понимаю. Но термин «параллельно» имеет вполне устоявшееся применение и не очень-то сюда подходит. То, о чем вы говорите — это скорее неблокируемый ввод-вывод — мол, пока ждем долгой io операции, можем заняться чем-то еще.
avtozavodetz
17.03.2022 00:07Правильный термин в данном случае - конкурентность (concurrency), а не параллелизм. Т.е. можно сказать, что сообщения будут обрабатываться асинхронно, или что они будут обрабатываться конкурентно, но точно не параллельно, т.к. GIL пока никто не отменял.
NikColonel
16.03.2022 16:56+1Круто увидеть у кого то похожее решение какой то проблемы) Такую же параллельную обработку очереди делал на java. В моем случае использовалась библиотека https://github.com/rubenlagus/TelegramBots и она тоже все сообщения от всех пользователей обрабатывает последовательно.
Есть один тонкий момент в обработке одной очереди: вы можете обработать два запроса от одного пользователя одновременно или не в порядке их получения, что приводит к нарушению бизнес логики. Например пользователь нажимает по очереди разные inline button, пока бот "подвис".
Как я понял, ваш пример этому подвержен. В моем проекте, на каждый "worker" создавалась дополнительная очередь. При появлении задачи в общей очереди, задача на основании TelegramID, закрепляется за свободной очередью "worker`a" и с этого времени этот "worker" обрабатывает только задачи этого TelegramID в порядке их появления. "worker" освобождается, когда его очередь кончается.
Так мне удалось решить эту проблему в моем случае.
Malomalsky
Што, опять?