Привет! Меня зовут Александр, я руководитель бэкенд-разработки в KTS.

Сегодня я покажу, как написать своего Телеграм-бота на основе asyncio и aiohttp. Мы не будем использовать ни aiogram, ни любые другие библиотеки, а напишем бота с нуля, чтобы немного познакомиться с асинхронным программированием, корутинами и некоторыми примитивами синхронизации.

Что будет в статье:

Основы ботоведения

Бот — сущность в мессенджере.

Они есть практически везде: в Телеграме, Инстаграме, ВК. Это программа, которая на основе определенных алгоритмов отвечает на сообщения пользователей. Задачи и направления самые разные: бот может просто присылать сообщения «С добрым утром!» А может, например, управлять участниками внутри чата. 

Перед началом работы с ботом его нужно создать и познакомиться с Telegram API.

Шаг 1. Сначала создаем бота с помощью BotFather внутри Телеграма:
https://core.telegram.org/bots#6-botfather

После создания вы получите сообщение:

Done! Congratulations on your new bot... 
Use this token to access the HTTP API: 
<token> 
...

В сообщении будет токен, который нужно использовать для запросов в Телеграм.

Шаг 2. Выполните первый запрос к Telegram API по инструкции: https://core.telegram.org/bots/api#authorizing-your-bot

В ответ вы должны получить что-то подобное:

{"ok":true,"result":{"id":2065163148,"is_bot":true,"first_name":"Metaclass","username":"metaclassbot","can_join_groups":true,"can_read_all_group_messages":false,"supports_inline_queries":false}}

Шаг 3. Далее нужно научиться получать уведомления из Телеграма. Существует 2 способа:

  • webhook — инициатором запроса выступает Телеграм.
    Когда пользователь пишет боту, Телеграм делает запрос на URL, который вы установите с помощью метода setwebhook. У этого метода есть недостатки: его трудно отлаживать, так как должен быть публичный адрес бота. Также на каждое сообщение выполняется HTTP-запрос, и при 1000+ сообщений в секунду серверы не справятся.

  • long polling — инициатором является ваше приложение.
    Оно обращается к Telegram API и получает уведомления или ожидает, если уведомлений нет — отсюда название long.

Мы для получения уведомлений будем использовать long polling. Описание метода.

  1. Выполните в браузере такой запрос:
    https://api.telegram.org/bot<token>/getUpdates
    Он завершится моментально и вернет:
    {"ok":true,"result":[]}

  2. Теперь выполните другой запрос:
    https://api.telegram.org/bot<token>/getUpdates?timeout=30

    1. Он будет висеть 30 секунд и, если ничего не написать боту, вернет:
      {"ok":true,"result":[]}

    2. Если написать боту хоть что-то, он моментально вернет Update.

Шаг 4. Теперь давайте научимся отправлять сообщение пользователю в ответ. Сначала отправим сообщение боту, для чего используем метод sendMessage. Метод принимает два обязательных параметра:

  • chat_id — поле, которое пришло в объекте Update. Может быть как id персонального чата, так и id группового чата.

  • text

Выполните в браузере запрос. chat_id нужно получить из предыдущего запроса: https://api.telegram.org/bot<token>/sendMessage?chat_id=85364161&text=hello.

В результате бот напишет вам сообщение hello.

Для работы с методами Telegram API будем использовать класс TgClient.

Echo-бот

Для начала давайте получим сообщения из Telegram. Для этого нужно вызвать метод get_updates класса TgClient:

import asyncio
import os

from clients.tg import TgClient


async def run_echo():
   c = TgClient(os.getenv("BOT_TOKEN"))
   print(await c.get_updates(offset=0, timeout=5))


if __name__ == "__main__":
   asyncio.run(run_echo())

Обратите внимание, что код получает токен бота из переменной окружения, поэтому перед запуском нужно установить значение переменной BOT_TOKEN .

При запуске кода может быть два исхода:

  1. Если боту отправляли сообщение не более 24 часов назад, метод get_updates вернет все последние сообщения. Чтобы было проще, далее по тексту я буду использовать термин «новые сообщения», а не «новые объекты update».

{'ok': True, 'result': [{'update_id': 503972397, 'message': {'message_id': 335, 'from': {'id': 85364161, 'is_bot': False, 'first_name': 'Alexander', 'last_name': 'Opryshko', 'username': 'alexopryshko', 'language_code': 'en'}, 'chat': {'id': 85364161, 'first_name': 'Alexander', 'last_name': 'Opryshko', 'username': 'alexopryshko', 'type': 'private'}, 'date': 1644871206, 'text': '1'}}]}

  1. Если боту не писали вообще, или писали давно, он зависнет на 5 секунд (timeout=5) и вернет пустой список:

    {'ok': True, 'result': []}

    при этом, если боту написать во время ожидания, он моментально вернет результат, как и в первом случае.

Чтобы получать сообщения из Телеграма постоянно, в цикле нужно вызывать get_updates, так как он завершается сразу после написания боту нового сообщения.

async def run_echo():
   c = TgClient(os.getenv("BOT_TOKEN"))
   while True:
       print(await c.get_updates(offset=0, timeout=60))

Но если запустить такой код, мы будем получать последние сообщения из Телеграма в бесконечном цикле. А чтобы получать только новые, нужно использовать параметр offset.

async def run_echo():
   c = TgClient(os.getenv("BOT_TOKEN"))
   offset = 0
   while True:
       res = await c.get_updates(offset=offset, timeout=60)
       for item in res["result"]:
           offset = item["update_id"] + 1
           print(item)

Правило Телеграма: после получения новой пачки сообщений нужно взять из ответа параметр update_id и следующий запрос выполнять с offset на единицу больше, чем последнее сообщение, пришедшее из get_updates.

Теперь можно завершить написание полноценного echo-бота:

async def run_echo():
   c = TgClient(os.getenv("BOT_TOKEN"))
   offset = 0
   while True:
       res = await c.get_updates_in_objects(offset=offset, timeout=60)
       for item in res.result:
           offset = item.update_id + 1
           await c.send_message(item.message.chat.id, item.message.text)

В функции заменили get_updates на get_updates_in_objects, потому что гораздо удобнее оперировать объектами, чем словарями.

Текущая реализация имеет большой недостаток — бот не работает параллельно: после получения обновления он сразу начинает выполнять бизнес-логику бота. В нашем случае он отправляет echo-сообщение, и в это время новые сообщения от бота получаться не будут. Получается, другие пользователи простаивают. Нужно как-то организовать параллельную обработку пользователей и обеспечить возможность масштабирования.

Поэтому мы пойдем другим путем.

Архитектура бота

Введем сущность poller. Он будет получать сообщения из Телеграма и ставить их в очередь, никакую бизнес-логику он не реализует. Он должен быть в единственном экземпляре.

Введем сущность worker. Он будет выполнять все рабочие задачи. worker берет задачу из очереди и каким-то образом выполняет ее.

Сущностей worker может быть много:

Такая схема лучше изначальной по двум причинам:

  • при возрастании нагрузки мы можем соответственно увеличить количество worker; 

  • сообщения от пользователей обрабатываются параллельно.

Реализация схемы

poller.py

import asyncio

from clients.tg import TgClient


class Poller:
   def __init__(self, token: str, queue: asyncio.Queue):
       self.tg_client = TgClient(token)
       self.queue = queue

   async def _worker(self):
       offset = 0
       while True:
           res = await self.tg_client.get_updates_in_objects(offset=offset, timeout=60)
           for u in res.result:
               offset = u.update_id + 1
               print(u)
               self.queue.put_nowait(u)

   async def start(self):
       asyncio.create_task(self._worker())

poller в точности повторяет логику echo-бота, за исключением отправки echo-сообщения:

  • работает бесконечно;

  • получает уведомления из Телеграма;

  • кладет сообщения в очередь.

Логика получения уведомлений описана в методе _worker, для запуска получения уведомлений нужно запустить именно его. Но просто вызвать await self._worker() не получится, потому что мы заблокируем основной поток выполнения, а нам еще нужно запустить worker, который будет вычитывать сообщения из очереди. Поэтому нужно запустить фоновую задачу с помощью asyncio.create_task.

Теперь рассмотрим, как запустить poller.

import asyncio
import os

from bot.poller import Poller


async def start():
   q = asyncio.Queue()
   poller = Poller(os.getenv("BOT_TOKEN"), q)
   await poller.start()


def run():
   loop = asyncio.get_event_loop()

   try:
       print('bot has been started')
       loop.create_task(start())
       loop.run_forever()
   except KeyboardInterrupt:
       pass


if __name__ == '__main__':
   run()

Так как бот должен работать бесконечно, то необходимо организовать бесконечный цикл. В echo-боте мы просто оставили while True, но в текущей реализации так сделать будет неудобно, поэтому лучше использовать метод запуска run_forever, предварительно положив все необходимые задачи в event loop с помощью метода create_task.
Документация по методу run_forever
Документация по методу create_task

Обратите внимание:

1. В коде poller используется asyncio.create_task , а при его запуске используется loop.create_task.
Отличие заключается в том, что мы явно указали, какой event loop нужно использовать в синхронной функции def run. В асинхронных функциях async def loop явно можно не указывать, потому что Python сам знает текущий цикл событий и прикрепляет задачу к нему.  Функция, из которой запускается create_task, тоже запущена в этом loop

2. Если в запущенной в фоне корутине create_task происходит исключение, мы можем не увидеть его сразу, только после остановки event loop. Из-за этого могут возникнуть сложности в нахождении ошибок. Почитать подробнее в этой статье.

worker.py

import asyncio

from clients.tg import TgClient
from clients.tg.dcs import UpdateObj


class Worker:
   def __init__(self, token: str, queue: asyncio.Queue, concurrent_workers: int):
       self.tg_client = TgClient(token)
       self.queue = queue
       self.concurrent_workers = concurrent_workers

   async def handle_update(self, upd: UpdateObj):
       print("before", upd)
       await asyncio.sleep(1)
       print("after", upd)

   async def _worker(self):
       while True:
           upd = await self.queue.get()
           await self.handle_update(upd)

   async def start(self):
       for _ in range(self.concurrent_workers):
           asyncio.create_task(self._worker())

У worker есть несколько кардинальных отличий от poller:

  • для обеспечения параллельной обработки входящих сообщений запускается несколько _worker, а количество параллельных воркеров регулируется параметром  concurrent_workers ;

  • новые сообщения приходят не из Telegram, а из очереди, которую предварительно заполнил poller;

  • внутри handle_update — который запускается при появлении нового сообщения в очереди — реализуется бизнес-логика обработки сообщения, т.е. бизнес-логика бота.

Теперь добавим запуск worker в корутину async def start. Обратите внимание, что очередь должна быть общая между poller и worker:

async def start():
   q = asyncio.Queue()
   poller = Poller(os.getenv("BOT_TOKEN"), q)
   await poller.start()
   worker = Worker(os.getenv("BOT_TOKEN"), q, 2)
   await worker.start()

На этом минимальная реализация бота готова, но есть нюансы, которые стоит улучшить:

  • в корутине start мы оперируем внутренними компонентами бота: очередью, poller, worker. Было бы хорошо иметь сущность Bot с одним методом “start”;

  • при остановке бота он завершается моментально, не дожидаясь выполнения запущенной логики бота. Поэтому может быть такое, что мы прервем пользовательский сценарий на середине.

Class Bot

Всю работу с компонентами бота вынесем в отдельный класс Bot:

import asyncio

from bot.poller import Poller
from bot.worker import Worker


class Bot:
   def __init__(self, token: str, n: int):
       self.queue = asyncio.Queue()
       self.poller = Poller(token, self.queue)
       self.worker = Worker(token, self.queue, n)

   async def start(self):
       await self.poller.start()
       await self.worker.start()

И перепишем функцию run:

import asyncio
import os

from bot.base import Bot


def run():
   loop = asyncio.get_event_loop()

   bot = Bot(os.getenv("BOT_TOKEN"), 2)
   try:
       print('bot has been started')
       loop.create_task(bot.start())
       loop.run_forever()
   except KeyboardInterrupt:
       pass


if __name__ == '__main__':
   run()

Код запуска стал чище, теперь не нужно думать про внутренние компоненты бота. Достаточно запустить bot.start(), и бот начнет функционировать

Остановка бота

Сейчас остановка происходит при нажатии Ctrl + C. Возникает исключение KeyboardInterrupt, которое мы ловим и молча завершаем работу бота:

try:
   print('bot has been started')
   loop.create_task(bot.start())
   loop.run_forever()
except KeyboardInterrupt:
   pass

Почему нужно делать красивое завершение (graceful shutdown):

1. Бизнес-логика бота может прерваться посередине, и для пользователя это будет выглядеть багом.
Пример: пользователь отправил файл боту, бот отправил сообщение, что файл загружается, и в этот момент его остановили. Бот будет загружать файл вечно, а пользователь останется в недоумении.

2. Если в боте есть подключения к другим компонентам, например, к базе данных или очереди, их нужно корректно завершать.

Поэтому введем функцию stop в Bot и каждый внутренний компонент. Она будет отвечать за корректное завершение. Начнем с poller.

Чтобы отменить запущенную задачу, нужно вызвать у нее метод cancel. https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

class Poller:
   def __init__(self, token: str, queue: asyncio.Queue):
       self.tg_client = TgClient(token)
       self.queue = queue
       self._task: Optional[Task] = None

   async def _worker(self): ...

   async def start(self):
       self._task = asyncio.create_task(self._worker())

   async def stop(self):
       self._task.cancel()

Для этого введем переменную _task, в которую сохраним объект созданной задачи, а в момент остановки poller вызовем cancel

В случае с worker нельзя просто взять и вызвать cancel у всех запущенных задач:

  • нужно обработать все задачи, которые poller положил в очередь, иначе обновления из Телеграма просто потеряются;

  • нельзя прерывать обработку конкретной задачи, т.е. у запущенной задачи cancel вызвать нельзя.

Получается, нужно дождаться выполнения всех задач, которые находятся в очереди. Для такой задачи есть метод join.
https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.join

class Worker:
   def __init__(self, token: str, queue: asyncio.Queue, concurrent_workers: int):
       ...
       self._tasks: List[asyncio.Task] = []

   async def handle_update(self, upd: UpdateObj):
       ...

   async def _worker(self):
       while True:
           try:
               upd = await self.queue.get()
               await self.handle_update(upd)
           finally:
               self.queue.task_done()

   async def start(self):
       self._tasks = [asyncio.create_task(self._worker()) for _ in range(self.concurrent_workers)]

   async def stop(self):
       await self.queue.join()
       for t in self._tasks:
           t.cancel()

Давайте разберемся, что мы сделали:

  • в методе start сохранили все запущенные задачи в self._tasks;

  • внутри метода stop перед вызовом метода cancel у всех задач дождались, когда все задачи из очереди будут выполнены с помощью await self.queue.join();

  • с помощью  self.queue.task_done()помечаем «выполненными» задачи внутри метода _worker.

В итоге получается:

  1. остановили poller, новые задачи не добавляются в очередь.

  2. внутри worker ждем, пока выполнятся все задачи (self.queue.join()), и только после этого вызываем отмену воркеров.

  3. так как все задачи завершились, а новые не поступают — poller остановлен — то можно вызывать cancel у задач и не бояться прервать бизнес-логику бота.

Теперь добавим в Bot метод stop:

async def stop(self):
   await self.poller.stop()
   await self.worker.stop()

И после возникновения исключения KeyboardInterrupt запустим остановку бота с помощью loop.run_until_complete.
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_until_complete

try:
   print('bot has been started')
   loop.create_task(bot.start())
   loop.run_forever()
except KeyboardInterrupt:
   print("\nstopping", datetime.datetime.now())
   loop.run_until_complete(bot.stop())
   print('bot has been stopped', datetime.datetime.now())

Чтобы проверить, что мы все сделали правильно, можно добавить asyncio.sleep(10) в метод  handle_update, отправить боту сообщение и попробовать завершить бота раньше, чем через 10 секунд. В итоге должна получиться подобная картина:

Заключение

Цель этой статьи — показать на примере, как работать с  базовым асинхронным программированием и примитивами синхронизации asyncio, а задача написания бота хорошо подходит для этих целей. Исходный код.


Написать такого бота — одно из домашних заданий курса «Асинхронное программирование» в нашей школе Metaclass.

11-го апреля у нас стартует третий поток курса. Если хотите разобраться в этой теме, записывайтесь по ссылке выше.

Чтобы быть в курсе новых потоков, вступайте в чат школы в Телеграме: https://t.me/kts_dev

Комментарии (7)


  1. Malomalsky
    15.03.2022 17:25
    +1

    Што, опять?


  1. mixsture
    15.03.2022 17:49
    +1

    сообщения от пользователей обрабатываются параллельно.

    Каким образом? насколько я помню, для этого либо мультипроцессинг нужен, либо выпилить GIL. Асинхронность же — это совсем не параллельно.


    1. alexopryshko Автор
      15.03.2022 18:01
      +1

      Когда выполняется io операция можно принять сообщение от другого пользователя и обработать его. А так как в боте много работы с API, то получается значительный прирост в скорости


      1. mixsture
        15.03.2022 19:57
        +6

        Да это я все понимаю. Но термин «параллельно» имеет вполне устоявшееся применение и не очень-то сюда подходит. То, о чем вы говорите — это скорее неблокируемый ввод-вывод — мол, пока ждем долгой io операции, можем заняться чем-то еще.


      1. avtozavodetz
        17.03.2022 00:07

        Правильный термин в данном случае - конкурентность (concurrency), а не параллелизм. Т.е. можно сказать, что сообщения будут обрабатываться асинхронно, или что они будут обрабатываться конкурентно, но точно не параллельно, т.к. GIL пока никто не отменял.


  1. nickwaze
    16.03.2022 10:57
    +1

    Зачем изобретать велосипед, когда есть прекрасный aiogram


  1. NikColonel
    16.03.2022 16:56
    +1

    Круто увидеть у кого то похожее решение какой то проблемы) Такую же параллельную обработку очереди делал на java. В моем случае использовалась библиотека https://github.com/rubenlagus/TelegramBots и она тоже все сообщения от всех пользователей обрабатывает последовательно.

    Есть один тонкий момент в обработке одной очереди: вы можете обработать два запроса от одного пользователя одновременно или не в порядке их получения, что приводит к нарушению бизнес логики. Например пользователь нажимает по очереди разные inline button, пока бот "подвис".

    Как я понял, ваш пример этому подвержен. В моем проекте, на каждый "worker" создавалась дополнительная очередь. При появлении задачи в общей очереди, задача на основании TelegramID, закрепляется за свободной очередью "worker`a" и с этого времени этот "worker" обрабатывает только задачи этого TelegramID в порядке их появления. "worker" освобождается, когда его очередь кончается.
    Так мне удалось решить эту проблему в моем случае.