Не так давно в Telegram вышло большое обновление - "Telegram для бизнеса". В данный момент оно доступно для Premium-пользователей, а в будущем, вероятно, станет отдельным режимом.
"Telegram для бизнеса" предоставляет собой новый способ взаимодействия с клиентами через Telegram, вводя для этого новые функции:
Адрес - Позволяет указать адрес и геопозицию в профиле.
Часы работы - Позволяет указать график работы бизнеса.
Быстрые ответы - Позволяет создать набор "шаблонных" ответов.
Приветствия - Позволяет установить автоматическое приветствие для новых клиентов.
"Нет на месте" - Позволяет отправлять автоматические ответы, в нерабочее время.
Ссылки на чат - Позволяет кастомизировать ссылки на чат с вами.
Вид нового чата - Позволяет кастомизировать вид чата для клиента, который открыл чат с вами, но ещё не написал сообщение.
Чат-боты - Позволяет подключить к учётной записи бота для взаимодействия с клиентами в личных чатах.
Из всего этого набора нас интересует только два пункта: Чат-боты и Часы работы.
Что мы с вами сделаем?
В этом посте мы создадим Telegram-бота, который будет принимать личные сообщения только в нерабочее время и для ответа использовать ChatGPT от OpenAI.
Поскольку OpenAI недоступен на территории РФ, вместо него будем использовать сервис NeuroAPI. Он предоставляет доступ к OpenAI из России и СНГ по более низким ценам.
Как это можно использовать?
Описанный в посте бот можно будет использовать как частному лицу, сделав личного ассистента на время отсутствия в сети, так и бизнесу для взаимодействия с клиентами в нерабочее время.
Главная сложность будет заключаться в составлении грамотного "системного промта", покрывающего ваши потребности.
Подключение бота в профиле.
Для проекта вам нужен бот, как его создать рассказано в посте "AIOgram3 1.5. Регистрация бота"
После создания бота и получения токена, в интерфейсе BotFather
, выполните команду /mybots
для вывода списка всех ботов.
Выберите нужного бота.
Затем в открывшемся меню выберите пункт "Bot Settings".
В следующем меню выберите пункт "Business Mode".
Включите бизнес режим.
После того, как включили бизнес режим для бота, откройте настройки Telegram и выберите пункт "Telegram для бизнеса", а в нём пункт "Чат-боты".
В открывшемся окне в первое поле пропишите ссылку на бота t.me/mybot
или его имя @mybot
.
Готово.
Подготовка проекта.
Создайте новый проект в удобной для вас IDE и активируйте виртуальное окружение.
Если вы пользуетесь PyCharm, то виртуальное окружение создаст IDE для нового проекта.
Если вы пользуетесь VSCode, то его придётся создать вручную, выполнив следующие команды:
python -m venv .venv
# для Windows
venv\Scripts\activate.ps1 или venv\Scripts\activate.bat
# для *NIX-систем
source venv/bin/activate
В проекте используются следующие библиотеки:
aiogram
- Фреймворк для бота.pydantic-settings
- Библиотека для создания классов конфигураций.openai
- Официальная библиотека OpenAI для Python.pytz
- Библиотека для работы с часовыми поясами.httpx
- Современная библиотека для создания синхронных/асинхронных запросов.redis
- Библиотека для подключения к Redis.
Установите их, выполнив команду:
pip install -U aiogram pydantic-settings openai pytz httpx redis
Создайте файл requirements.txt
и внесите в него установленные библиотеки:
aiogram==3.6.0
pydantic-settings==2.2.1
openai==1.29.0
pytz==2024.1
httpx==0.27.0
redis==5.0.4
Далее создайте файл .env
для хранения переменных окружения.
Необходимы следующие переменные:
token
- Токен бота, полученный от BotFather.admin_id
- Telegram-id администратора.openai_key
- API-ключ полученный на сайте NeuroAPI или OpenAI.openai_base_url
- Адрес прокси-сервера для OpenAI.redis_host
- Хост для подключения к Redis. В нашем случае используется Docker compose, поэтому прописываем имя сервиса -redis
.delay
- Задержка между ответами в минутах. Об этом ниже.
Пример:
token=12345:abcd
admin_id=123456789
openai_key=sk-abcd
openai_base_url=https://lk.neuroapi.host/v1
redis_host=redis
delay=10
Также создайте файл main.py
и пакет (Python package) app
.
Файл конфигурации.
В пакете app
создайте файл settings.py
.
В нём будем получать данные из .env-файла
и определим инстанс бота и Redis.
Создайте класс Secrets
, унаследованный от BaseSettings
. Этот класс будет получать из .env-файла
данные и преобразовывать их в Python-объекты. Для этого используется библиотека pydantic-settings
.
В теле класса пропишите шесть полей с указанием типа данных:
token: str
admin_id: int
openai_key: str
openai_base_url: str
redis_host: str
delay: int
После полей, внутри класса напишите внутренний класс Config
, в котором укажите из какого файла брать данные и его кодировку:
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
Под классом создадим переменную secrets
и объявим её экземпляром класса Secrets
.
Далее создайте переменную redis_conn
, это будет экземпляр класса Redis
, в который передаём адрес хоста. Будьте внимательны во время импорта класса! Нам нужен асинхронный Redis.
redis_conn = Redis(host=secrets.redis_host)
Последней будет переменная bot
. Объявите её экземпляром класса Bot
, передав в него токен и режим форматирования сообщений.
bot = Bot(token=secrets.token, parse_mode="Markdown")
Про parse_mode
: Поскольку в ответе ChatGPT может находиться блок кода или другое форматирование, для корректного отображения его необходимо "распарсить". Передав параметр parse_mode="Markdown"
, мы сообщаем боту, что все сообщения будут с Markdown-форматированием.
Полный код файла:
from aiogram import Bot
from pydantic_settings import BaseSettings
from redis.asyncio import Redis
class Secrets(BaseSettings):
token: str
admin_id: int
openai_key: str
openai_base_url: str
redis_host: str
delay: int
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
secrets = Secrets()
redis_conn = Redis(host=secrets.redis_host)
bot = Bot(token=secrets.token, parse_mode="Markdown")
Хранилище строк.
Для хранения текстовых строк в одном месте в пакете app
создайте файл views.py
.
Этого можно и не делать. Кроме того, вариант с функциями можно заменить на получение текста из файла или иной способ.
Создайте три простые функции, которые ничего не принимают и возвращают текстровую строку:
start_bot_message
- Сообщение о запуске бота для администратора.stop_bot_message
- Сообщение об остановке бота для администратора.system_prompt
- Системный промт, описывающий поведение ChatGPT.
Код:
def start_bot_message():
return "Бот запущен"
def stop_bot_message():
return "Бот остановлен"
def system_prompt():
return """Ты бот помощник и ты должен помогать людям."""
Проверка рабочего времени.
В Telegram часы работы указываются по дням с понедельника по воскресенье. В коде же это выглядит как список объектов класса BusinessOpeningHoursInterval
.
В объекте класса BusinessOpeningHoursInterval
есть два поля: opening_minute
и closing_minute
, представленные в виде количества минут прошедших с 00:00 ближайшего понедельника, с учётом указанной временной зоны.
Необходимо получить текущее количество минут, прошедших с понедельника, и пройтись по списку, проверяя, входит ли текущее число в один из диапазонов.
Если входит, то бот будет игнорировать сообщения.
Если не входит, бот будет отвечать на сообщения.
В пакете app
, создайте новый пакет utils
. В этом пакете создайте файл opening_hours.py
.
Создайте функцию check_opening_hours
, принимающую opening_hours
- объект класса BusinessOpeningHours
.
Класс BusinessOpeningHours
содержит два поля:
time_zone_name
- Название временной зоны. Определяется в профиле Telegram при заполнении графика работы.opening_hours
- Упомянутый выше список с объектами классаBusinessOpeningHoursInterval
.
Далее создайте четыре переменные:
tz
- В ней при помощи библиотекиpytz
получаем информацию об указанной временной зоне.now
- В ней получаем текущее время с учётом временной зоны.monday_start
- В ней высчитываем время до начала понедельника.minutes_since_monday
- В ней высчитываем сколько прошло минут с начала недели.
tz = pytz.timezone(opening_hours.time_zone_name)
now = datetime.datetime.now(tz)
monday_start = now - datetime.timedelta(
days=now.weekday(),
hours=now.hour,
minutes=now.minute,
seconds=now.second,
microseconds=now.microsecond,
)
minutes_since_monday = (now - monday_start).total_seconds() / 60
Далее создайте цикл, в котором будем итерироваться по списку интервалов и проверять, входит ли текущее время в этот список.
for day in opening_hours.opening_hours:
if day.opening_minute <= minutes_since_monday <= day.closing_minute:
return False
return True
Полный код:
import datetime
import pytz
from aiogram.types import BusinessOpeningHours
def check_opening_hours(opening_hours: BusinessOpeningHours):
tz = pytz.timezone(opening_hours.time_zone_name)
now = datetime.datetime.now(tz)
monday_start = now - datetime.timedelta(
days=now.weekday(),
hours=now.hour,
minutes=now.minute,
seconds=now.second,
microseconds=now.microsecond,
)
minutes_since_monday = (now - monday_start).total_seconds() / 60
for day in opening_hours.opening_hours:
if day.opening_minute <= minutes_since_monday <= day.closing_minute:
return False
return True
Проверка входящих сообщений.
При получении входящего сообщения необходимо проверить актуальный режим работы и либо передать сообщение дальше в обработчик, либо "сбросить" его, тем самым никак не реагируя.
Для этого будем использовать миддлвари (middleware) - это так называзываемые "посредники", срабатывающие до передачи сообщения в обработчик и в зависимости от логики выполняющие различные действия, например, запись в БД, проверку аутентификации и многое другое.
В пакете app
создайте пакет middlewares
. В нём создайте файл business_middleware.py
.
В этом файле создайте класс BusinessMiddleware
, унаследованный от BaseMiddleware
.
В нём нам нужно переопределить dunder-метод
__call__
, принимающий self
, handler
, event
, data
.
Далее нам необходимо получить из текущего чата объект класса BusinessOpeningHours
.
Лирическое отступление.
В актуальной на момент написания поста версии aiogram 3.6.0
, заявлена полная поддержка Bot API 7.3
. Если обратиться к объекту чата, то там будет параметр business_opening_hours
, однако вместо желаемого объекта BusinessOpeningHours
там находится None
.
В этом посте мы применим небольшой "костыль", для решения этой проблемы.
Разработчикам aiogram
был отправлен баг-репорт. Если в будущих версиях ситуация будет исправлена, пост будет обновлён.
Конец лирического отступления.
Для получения актуального графика работы мы обратимся к API Telegram.
Используя асинхронный менеджер контекста и библиотеку httpx
, откройте асинхронный клиент для работы.
В переменную response
получаем результат GET-запроса
на сервер Telegram.
В переменной chat
получаем JSON-объект из переменной response
.
Затем в переменной full_chat
создаём экземпляр класса ChatFullInfo
, распаковав в него содержимое chat
по ключу result
. Таким образом мы преобразуем чистые JSON-данные в Python-объекты.
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.telegram.org/bot{secrets.token}/getChat?chat_id={secrets.admin_id}"
)
chat = response.json()
full_chat = ChatFullInfo(**chat["result"])
Далее в блоке if
вызываем ранее написанную функцию check_opening_hours
, передав в неё full_chat.business_opening_hours
.
Если возвращается True
, мы продолжаем.
Внутри условия создаём переменную context
, в которую присваиваем значение ключа event_context
из переменной data
.
Дальше ещё одно условие if
, в котором проверяем, что сообщение содержит business_connection_id
, т.е. является личным и что отправитель сообщения не админ, иначе бот будет реагировать и на ваши сообщения тоже.
Если условия соблюдаются, передаём сообщение дальше в обработчик.
if check_opening_hours(full_chat.business_opening_hours):
context: EventContext = data.get("event_context")
if (
context.user.id != secrets.admin_id
and context.business_connection_id
):
return await handler(event, data)
Полный код файла:
from typing import Callable, Dict, Any, Awaitable
import httpx
from aiogram import BaseMiddleware
from aiogram.dispatcher.middlewares.user_context import EventContext
from aiogram.types import TelegramObject, ChatFullInfo
from app.settings import secrets
from app.utils.opening_hours import check_opening_hours
class BusinessMiddleware(BaseMiddleware):
async def __call__(
self,
handler: CallableTelegramObject, Dict[str, Any, Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.telegram.org/bot{secrets.token}/getChat?chat_id={secrets.admin_id}"
)
chat = response.json()
full_chat = ChatFullInfo(**chat["result"])
if check_opening_hours(full_chat.business_opening_hours):
context: EventContext = data.get("event_context")
if (
context.user.id != secrets.admin_id
and context.business_connection_id
):
return await handler(event, data)
Подключение ChatGPT.
В этой функции будем отправлять запрос к ChatGPT и возвращать полученный ответ.
В пакете utils
, создайте файл openai_actions.py
.
Создайте асинхронную функцию get_chat_completion
, принимающую message
- объект класса Message
.
В переменной http_client
определите объект класса httpx.AsyncClient
. Это объект HTTP-клиента, используя который будет произведён запрос.
В переменной client
определите объект класса AsyncOpenAI
, передав в него аргументы: api_key
, http_client
и base_url
. Это объект клиента для OpenAI.
http_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
client = AsyncOpenAI(
api_key=secrets.openai_key,
http_client=http_client,
base_url=secrets.openai_base_url,
)
Далее в переменной messages
создайте список словарей, где первый словарь – это системный промт, а второй – сообщение от пользователя:
messages = [
{"role": "system", "content": system_prompt()},
{"role": "user", "content": message.text},
]
В переменную response
создайте запрос, передав в него:
model
- Выбранная модель ChatGPT, например,gpt-3.5-turbo
,gpt-4-turbo
,gpt-4o
или любую другую поддерживаемую OpenAI.messages
- Список словарей с сообщениями.max_tokens
- Ограничение на максимальное количество токенов в ответе.temperature
- Температура в диапазоне от 0 до 1. Определяет уровень "фантазии" бота. Чем ближе число к нулю, тем более предсказуемы будут ответы и наоборот, чем ближе к единице, тем более случайными будут ответы.
И возвращаем результат запроса в обработчик:
response = await client.chat.completions.create(
model="gpt-3.5-turbo", messages=messages, max_tokens=1000, temperature=0.8
)
return response.choices[0].message.content
Полный код:
import httpx
from aiogram.types import Message
from openai import AsyncOpenAI
from app.settings import secrets
from app.views import system_prompt
async def get_chat_completion(message: Message):
http_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
client = AsyncOpenAI(
api_key=secrets.openai_key,
http_client=http_client,
base_url=secrets.openai_base_url,
)
messages = [
{"role": "system", "content": system_prompt()},
{"role": "user", "content": message.text},
]
response = await client.chat.completions.create(
model="gpt-3.5-turbo", messages=messages, max_tokens=1000, temperature=0.8
)
return response.choices[0].message.content
Задержка обработки сообщений.
Для того, чтобы пользователи не спамили и не использовали личные сообщения как "бесплатный GPT", добавим задержку в обработке сообщений.
В вашей реализации логики она может быть не нужна.
В пакете utils
создайте файл check_delay.py
, а в нём асинхронную функцию check_user_delay
, принимающую user_id
.
Тут-то нам и понадобится Redis
для хранения пользовательских ID
и времени последнего сообщения. Вы можете использовать для этого другую БД или вовсе словарь в коде, это не принципиально.
В переменную last_message_time
получаем из Redis
по user_id
время последнего сообщения, если оно есть. Если его нет - вернётся None
.
В блоке if
проверяем, что last_message_time True
(проще говоря, не None
).
Внутри блока в переменную time_since_last_message
получаем разницу между текущим временем и полученным из хранилища.
Ниже проверяем, если оно меньше указанной в .env
допустимой задержки, то возвращаем False
.
Во всех остальных случаях возвращаем True
.
Полный код:
import asyncio
from app.settings import redis_conn, secrets
async def check_user_delay(user_id: int):
last_message_time = await redis_conn.get(f"users:{user_id}")
if last_message_time:
time_since_last_message = asyncio.get_event_loop().time() - float(
last_message_time
)
if time_since_last_message < secrets.delay * 60:
return False
return True
Обработчик бизнес сообщений.
Осталось написать обработчик, в который middleware будет передавать сообщение.
В пакете app
создайте пакет handlers
, а в нём файл business_handler.py
.
В этом файле создайте асинхронную функцию handle_business_message
, принимающую message
- объект класса Message
.
В самом начале создайте блок if
, проверяющий задержку и наличие текста в сообщении (отправить могут картинку или видео, а это другая логика работы с ChatGPT).
Если условие не выполняется, то сообщение просто игнорируется.
Если условие выполнено, переходим к обработке.
В переменной answer
вызываем функцию get_chat_completion
, передав в неё message
.
Затем отвечаем пользователю полученным сообщением.
Сохраняем в Redis
время текущего сообщения.
Полный код:
import asyncio
from aiogram.types import Message
from app.settings import redis_conn
from app.utils.check_delay import check_user_delay
from app.utils.openai_actions import get_chat_completion
async def handle_business_message(message: Message):
if await check_user_delay(message.from_user.id) and message.text:
answer = await get_chat_completion(message)
await message.reply(answer)
await redis_conn.set(
f"users:{message.from_user.id}", asyncio.get_event_loop().time()
)
Обработка уведомлений о запуске/остановке бота.
Небольшое, но удобное дополнение.
В пакете handlers
создайте файл events.py
.
В нём создайте две асинхронные функции: start_bot
и stop_bot
.
В функциях отправляем сообщение администратору.
from app.settings import bot, secrets
from app import views
async def start_bot():
await bot.send_message(secrets.admin_id, views.start_bot_message())
async def stop_bot():
await bot.send_message(secrets.admin_id, views.stop_bot_message())
Основной файл.
Логику написали. Теперь осталось соединить всё вместе.
Откройте созданный ранее файл main.py
. Он должен находиться в корне проекта рядом с файлом .env
.
В нём создайте асинхронную функцию start
.
В переменной dp
объявите экземпляр класса Dispatcher
.
Далее в несколько строк зарегистрируйте middleware и обработчики:
dp = Dispatcher()
dp.update.middleware(BusinessMiddleware())
dp.startup.register(start_bot)
dp.shutdown.register(stop_bot)
dp.business_message.register(handle_business_message)
Обратите внимание на dp.business_message.register
. Регистрируется обработка business_message
, а не обычного message
.
Далее в блоке try
вызывается очистка сообщений, отправленных, когда бот был офлайн, и запуск пуллинга, а в блоке finally
выполняется остановка бота.
Вне функции в блоке if __name__ "__main__"
запускаем функцию старт.
Полный код:
import asyncio
from aiogram import Dispatcher
from aiogram.methods import DeleteWebhook
from app.handlers.business_handler import handle_business_message
from app.handlers.events import start_bot, stop_bot
from app.middlewares.business_middleware import BusinessMiddleware
from app.settings import bot
async def start():
dp = Dispatcher()
dp.update.middleware(BusinessMiddleware())
dp.startup.register(start_bot)
dp.shutdown.register(stop_bot)
dp.business_message.register(handle_business_message)
try:
await bot(DeleteWebhook(drop_pending_updates=True))
await dp.start_polling(bot)
finally:
await bot.session.close()
if __name__ "__main__":
asyncio.run(start())
Запуск бота.
Для запуска бота и Redis будем использовать Docker compose.
Сперва необходимо создать образ с ботом, для этого создайте файл Dockerfile
со следующим содержимым:
FROM python:3.11-slim
WORKDIR /code
COPY requirements.txt /code
RUN pip install --upgrade pip && pip install -r requirements.txt
COPY . /code
CMD [ "python", "./main.py" ]
В нём создаётся Docker-образ, в котором устанавливаются все зависимости из файла requirements.txt
. Затем копируются файлы проекта и выполняется команда запуска бота.
Затем создайте файл docker-compose.yaml
со следующим содержимым:
services:
bot:
build: .
restart: always
env_file:
- .env
volumes:
- .:/code
redis:
image: redis
restart: always
volumes:
- ./redis_data:/data
В нём описываются два сервиса:
Первый bot
. Указываем, что необходимо создать образ из Dockerfile
, передать в него .env-файл
и подключить текущую папку внутри контейнера.
Второй redis
. Указываем, что будет использоваться официальный образ redis
последней версии, и подключаем папку redis_data
внутри контейнера, чтобы не потерять данные.
Готово.
Запустить бота можно командой:
docker compose up -d
Пост написан для Telegram-канала "Код на салфетке". У нас также есть сайт.