Привет, мир! Меня зовут Павел, я IT инженер и руководитель службы технической поддержки.
Эта статья - вторая часть инструкции по внедрению коннектора WhatsApp и Telegram для Открытых линий CRM Bitrix24. С логикой подключения к Битрикс вы можете ознакомиться в первой части, а в этой статье мы рассмотрим логику обмена сообщениями через WhatsApp.
Обработчик WhatsApp Web
Как бы очевидно это не звучало, но первично нам понадобится обработчик WhatsApp. В ходе долгих поисков адекватного решения, выбор пал на Evolution API в виду неплохой документации относительно аналогов и "живого" комьюнити (хоть и испаноязычного).
Инструкция по развертыванию на сервере есть на официальном сайте - https://doc.evolution-api.com/v1/en/install/docker. Однако я бы рекомендовал использовать образ evoapicloud/evolution-api:v2.3.1, в котором исправлена работа с групповыми чатами. При установке, помимо обязательных переменных, в .env надо указать:
WEBHOOK_GLOBAL_ENABLED=true
WEBHOOK_GLOBAL_URL='https://<url_коннектора>/api/waweb/?api-key=XXXX'
AUTHENTICATION_API_KEY=YYY
CONFIG_SESSION_PHONE_VERSION="2.3000.1025062854"
Модуль коннектора WhatsApp в нашем веб-приложении
Создадим django приложение waweb. Далее реализуем модели в models.py. Модель для хранения данных о сервере Evolution API и его базовой конфигурации:
class Server(models.Model):
url = models.URLField(max_length=255, unique=True, verbose_name="Server URL")
api_key = models.CharField(max_length=255, verbose_name="API Key")
max_connections = models.PositiveIntegerField(default=100)
groups_ignore = models.BooleanField(default=True)
always_online = models.BooleanField(default=False)
read_messages = models.BooleanField(default=False)
def __str__(self):
return self.url
Модель для хранения данных о сессии WhatsApp:
class Session(models.Model):
session = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
server = models.ForeignKey(Server, on_delete=models.SET_NULL, related_name="sessions", null=True, blank=True)
apikey = models.CharField(max_length=255, blank=True, null=True)
instanceId = models.CharField(max_length=255, blank=True, null=True)
date_end = models.DateTimeField(null=True, blank=True)
phone = models.CharField(max_length=15, blank=True, null=True)
groups_ignore = models.BooleanField(default=True)
sms_service = models.BooleanField(default=True)
status = models.CharField(max_length=15, blank=True, null=True)
owner = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True)
app_instance = models.ForeignKey(
AppInstance, on_delete=models.SET_NULL, related_name="wawebs", null=True, blank=True)
line = models.ForeignKey(
Line,
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name="wawebs",
)
def __str__(self):
return f"Session: {self.session}, Phone: {self.phone or 'Not connected'}"
Приступим к реализации api эндпоинта для обработки событий WhatsApp в файле api/views.py. Реализуем функцию, принимающую события от WhatsApp и формирующую корректный payload для Битрикс, в зависимости от типа сообщения. Таким образом, обработаем сразу сообщения из групп, медиафайлы и голосовые сообщения. Также реализуем загрузку локация через GoogleMap. Для каждого сообщения проверяется связь между Whatsapp сессией и открытой линией Битрикс, а также реализована защита от отправки повторных сообщений путем фильтрации по message_id:
from rest_framework.viewsets import GenericViewSet
from rest_framework.response import Response
from rest_framework.decorators import action
from waweb.models import Session
from rest_framework import permissions
import requests
from django.conf import settings
import redis
import logging
import uuid
import re
from django.utils import timezone
import waweb.tasks as tasks
import waweb.utils as utils
import bitrix.utils as bitrix_utils
import bitrix.tasks as bitrix_tasks
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0)
logger = logging.getLogger("django")
class EventsHandler(GenericViewSet):
def create(self, request, *args, **kwargs):
event_data = request.data
sessionid = event_data.get('instance')
if not sessionid:
return Response({'error': 'sessionId is required'})
try:
session = Session.objects.get(session=sessionid)
except Session.DoesNotExist:
return Response({'error': f'Session with sessionId {sessionid} does not exist'})
if not session.owner:
return Response({'error': 'Session has no owner'})
event = event_data.get("event")
data = event_data.get("data", {})
apikey = event_data.get('apikey')
if apikey and session.apikey != apikey:
session.apikey = apikey
session.save(update_fields=["apikey"])
server = session.server
headers = {"apikey": session.apikey}
if event == "connection.update":
state = data.get('state')
if session.status != state:
session.status = state
session.save(update_fields=["status"])
if state == "open":
wuid = data.get("wuid")
number = wuid.split("@")[0]
session.phone = number
session.save(update_fields=["phone"])
if Session.objects.exclude(pk=session.pk).filter(phone=number).exists():
headers = {"apikey": server.api_key}
response = requests.delete(f"{server.url}instance/logout/{sessionid}", headers=headers, timeout=60)
response = requests.delete(f"{server.url}instance/delete/{sessionid}", headers=headers, timeout=60)
session.delete()
return Response({'error': 'Phone number already in use, session deleted'})
elif event in ["messages.upsert", "send.message"]:
if session.date_end and timezone.now() > session.date_end:
return Response({'error': 'tariff has expired'})
message = data.get('message', {})
key_data = data.get('key', {})
message_id = key_data.get('id')
if redis_client.exists(f'waweb:{message_id}'):
return Response({'message': 'loop message'})
fromme = key_data.get('fromMe')
sender = event_data.get('sender').split('@')[0]
remoteJid = key_data.get('remoteJid')
pushName = data.get("pushName")
group_message = False
if remoteJid.endswith('@g.us'):
group_message = True
params = {"groupJid": remoteJid}
group_name = requests.get(f"{server.url}group/findGroupInfos/{sessionid}", params=params, headers=headers, timeout=60)
if group_name.status_code == 200:
pushName = group_name.json().get("subject")
file_data = {}
remoteJid = remoteJid.split('@')[0]
profilepic_url = None
if not group_message:
profilepic = requests.post(f"{server.url}chat/fetchProfilePictureUrl/{sessionid}",
json={"number": remoteJid}, headers=headers, timeout=60)
if profilepic.status_code == 200:
profilepic = profilepic.json()
profilepic_url = profilepic.get("profilePictureUrl")
payload = {
'sender': sender,
'remoteJid': remoteJid,
'fromme': fromme,
}
msg_type = data.get('messageType')
fileName = None
print(f"Message Recieved: {data}")
if msg_type == 'conversation':
if group_message:
group_sender_name = f"{data.get('pushName')}:" if data.get('pushName') else ""
payload.update({'content': f"*{pushName}*\n{group_sender_name} {message.get('conversation')}"})
else:
payload.update({'content': message.get('conversation')})
elif msg_type == 'locationMessage':
location = message.get(msg_type, {})
latitude = location.get('degreesLatitude')
longitude = location.get('degreesLongitude')
description = f"{location.get('name')}: {location.get('address')}"
body = f"Link: https://www.google.com/maps/place/{latitude},{longitude}"
if "None" not in description:
body = f"Address: {description} \n {body}"
payload.update({'content': body})
elif msg_type == 'contactMessage':
payload.update({'content': message.get(msg_type, {}).get("vcard")})
elif msg_type == 'templateMessage':
hydratedTemplate = message.get(msg_type, {}).get("hydratedTemplate", {})
hydratedTitleText = hydratedTemplate.get("hydratedTitleText")
hydratedContentText = hydratedTemplate.get("hydratedContentText")
hydratedFooterText = hydratedTemplate.get("hydratedFooterText")
payload.update({'content': f"{hydratedTitleText} \n {hydratedContentText} \n {hydratedFooterText}"})
elif msg_type in ["imageMessage", "documentMessage", "videoMessage", "audioMessage"]:
payload.update({'content': message.get(msg_type, {}).get("caption")})
media_url = f"{server.url}chat/getBase64FromMediaMessage/{sessionid}"
msg_payload = {"message": {"key": {"id": message_id}}}
response = requests.post(media_url, json=msg_payload, headers=headers, timeout=60)
if response.status_code == 201:
file_data = response.json()
file_body = file_data.get('base64')
fileName = file_data.get('fileName')
mimetype = file_data.get('mimetype')
if file_body:
from io import BytesIO
import base64
file_bytes = base64.b64decode(file_body)
file_like = BytesIO(file_bytes)
file_like.name = fileName
payload.update({'attachments': (file_like.name, file_like, mimetype)})
else:
return Response({'message': 'ok'})
try:
if session.line:
file_url = None
text = payload.get("content") or fileName
if file_data:
domain = session.line.portal.domain
chat_key = f'bitrix_chat:{domain}:{session.line.line_id}:{remoteJid}'
if redis_client.exists(chat_key):
chat_id = redis_client.get(chat_key).decode('utf-8')
chat_folder = None
try:
chat_folder = bitrix_utils.call_method(session.app_instance, "im.disk.folder.get", {"CHAT_ID": chat_id})
if isinstance(chat_folder, dict) and chat_folder.get("error"):
logger.error(chat_folder["detail"])
except Exception as e:
logger.error(f"Bitrix error: {e}")
if chat_folder and "result" in chat_folder:
bitrix_utils.call_method(session.app_instance, "imopenlines.session.join", {"CHAT_ID": chat_id})
folder_id = chat_folder.get("result").get("ID")
upload_file = bitrix_utils.upload_file(
session.app_instance, folder_id,
file_body, fileName)
if upload_file:
file_url = upload_file.get("DOWNLOAD_URL")
if fromme:
bitrix_tasks.message_add(session.app_instance.id, session.line.line_id,
remoteJid, text, session.line.connector.code)
if file_url:
file_upd = {
"CHAT_ID": chat_id,
"UPLOAD_ID": upload_file.get("FILE_ID"),
"DISK_ID": upload_file.get("ID"),
"SILENT_MODE": "Y",
"MESSAGE": fileName
}
bitrix_tasks.call_api(session.app_instance.id, "im.disk.file.commit", file_upd)
else:
attach = None
if file_url:
attach = [
{
"url": file_url,
"name": fileName
}
]
bitrix_tasks.send_messages(session.app_instance.id, remoteJid, text, session.line.connector.code, session.line.line_id,
False, pushName, message_id, attach, profilepic_url)
return Response({"message": "message processed"})
except Exception as e:
print(f'Failed to send API message: {str(e)}')
return Response({'error': f'Failed to send API message: {str(e)}'}, status=500)
return Response({'message': 'ok'})
@action(detail=False, methods=['post'], url_path=r'(?P<session>[^/.]+)/send', permission_classes=[permissions.AllowAny])
def send(self, request, session=None, *args, **kwargs):
session_id = session
if not session_id:
return Response({'error': 'session is required'})
try:
session = Session.objects.get(session=session_id)
except Exception as e:
return Response({'error': 'An error occurred', 'details': str(e)})
if session.date_end and timezone.now() > session.date_end:
return Response({'error': 'tariff has expired'}, status=402)
data = request.data
event = data.get('event')
message_type = data.get('message_type')
attachments = data.get('attachments', {})
if event == "message_created" and message_type == "outgoing":
message_id = data.get('id')
if redis_client.exists(f'chatwoot:{message_id}'):
return Response({'message': 'loop message'})
content = data.get('content')
conversation = data.get('conversation', {})
meta = conversation.get('meta', {})
sender = meta.get('sender', {})
phone_number = sender.get('phone_number')
if content:
wa_resp = utils.send_message(session_id, phone_number, content)
if wa_resp.status_code == 201:
utils.store_msg(wa_resp)
if session.line:
cleaned_phone = re.sub(r'\D', '', phone_number)
bitrix_tasks.message_add(session.app_instance.id, session.line.line_id,
cleaned_phone, content, session.line.connector.code)
if attachments:
for attachment in attachments:
tasks.send_message_task(str(session.session), [phone_number], attachment, 'media')
return Response({'message': 'All files sent successfully'})
return Response({'message': f'Session {session_id} authorized'})
Приступим к реализации функциональной части модуля. Создадим файл utils.py.
Предварительно рассмотрим импорты:
import requests
import base64
import mimetypes
import re
import redis
import time
import magic
from django.conf import settings
from .models import Session
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0)
Далее реализуем функционал работы с файлами. Функция download_file принимает url отправленного файла и id сообщения. Далее нам нужно получить mimetype файла с помощью библиотеки python-magic, определить его расширение и передать данные в формате base64:
def download_file(attachment):
data_url = attachment.get("data_url") or attachment.get("link")
message_id = attachment.get("message_id", time.time())
try:
response = requests.get(data_url, stream=True, timeout=60)
if response.status_code != 200:
raise Exception(f"Failed to download file: {response.status_code} {response.text}")
file_content = response.content
mime = magic.Magic(mime=True)
mimetype = mime.from_buffer(file_content)
extension = mimetypes.guess_extension(mimetype) or ''
filename = f"{message_id}{extension}"
base64_encoded_data = base64.b64encode(file_content).decode("utf-8")
return {
"mimetype": mimetype,
"data": base64_encoded_data,
"filename": filename
}
except Exception as e:
print(f"Error processing file: {str(e)}")
return None
Также для исключения повторных сообщений, настроим сохранение id сообщений в redis:
def store_msg(resp):
data = resp.json()
msg_data = data.get('key', {})
message_id = msg_data.get('id')
if message_id:
redis_client.setex(f'waweb:{message_id}', 600, message_id)
Реализуем непосредственно функцию работы с сообщениями. Данная функция очищает телефон от лишних символов (для корректной работы с лидами) и формирует payload в зависимости от типа сообщения:
def send_message(session_id, recipient, content, cont_type="string"):
session = Session.objects.get(session=session_id)
server = session.server
headers = {"apikey": session.apikey}
cleaned = re.sub(r'\D', '', recipient)
if cont_type == "string":
payload = {
"number": cleaned,
"text": content,
"linkPreview": True,
}
url = f"{server.url}message/sendText/{session_id}"
print("Sending WA message by url: ", url)
print("WA message payload: ", payload)
return requests.post(url, json=payload, headers=headers, timeout=60)
elif cont_type == "media":
url = f"{server.url}message/sendMedia/{session_id}"
mimetype = content.get("mimetype", "")
base_type = mimetype.split('/')[0]
mediatype = base_type if base_type in ["image"] else "document"
payload = {
"number": cleaned,
"mediatype": mediatype,
"mimetype": content.get("mimetype"),
"media": content.get("data"),
"fileName": content.get("filename")
}
return requests.post(url, json=payload, headers=headers, timeout=60)
Реализуем celery функции для асинхронной работы приложения:
import requests
from django.utils import timezone
from django.db.models import Q
from datetime import timedelta
from celery import shared_task
import waweb.utils as utils
from django.conf import settings
from waweb.models import Session
@shared_task
def send_message_task(session_id, recipients, content, cont_type="string", from_web=False):
if cont_type == "media":
content = utils.download_file(content)
for recipient in recipients:
resp = utils.send_message(session_id, recipient, content, cont_type)
if resp.status_code == 201 and not from_web:
utils.store_msg(resp)
@shared_task
def delete_sessions(days=None):
now = timezone.now()
filters = Q((Q(phone__isnull=True) | Q(phone='')) & Q(date_end__lt=now))
if days is not None:
try:
days_int = int(days)
date_limit = now - timedelta(days=days_int)
filters = filters | Q(date_end__lt=date_limit)
except (TypeError, ValueError):
pass
sessions = Session.objects.filter(filters)
for session in sessions:
server = session.server
headers = {"apikey": server.api_key}
url = f"{server.url}instance/delete/{session.session}"
requests.delete(url, headers=headers, timeout=60)
session.delete()
Далее создадим views.py, где опишем функционал подключения WhatsApp к коннектору:
import requests
import redis
import uuid
from requests.exceptions import RequestException
from django.shortcuts import render, redirect, get_object_or_404
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.db.models import Count, F
from django.utils import timezone
from django.conf import settings
from bitrix.models import AppInstance, Line, Connector
import bitrix.utils as bitrix_utils
from users.models import Message
from main.decorators import login_message_required
from .models import Session, Server
from .forms import SendMessageForm
from .tasks import send_message_task
apps = settings.INSTALLED_APPS
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0, decode_responses=True)
LINK_TTL = 60 * 60 * 24
Для начала создадим вьюшку для главной страницы модуля, в которой получим доступные сервера Evolution API и подключенные сессии (если таковые имеются).
@login_message_required(code="waweb")
def wa_sessions(request):
connector_service = "waweb"
connector = Connector.objects.filter(service=connector_service).first()
if request.method == "POST":
session_id = request.POST.get("session_id")
line_id = request.POST.get("line_id")
if not line_id:
messages.warning(request, "Необходимо выбрать линию из списка или создать новую.")
return redirect('waweb')
phone = get_object_or_404(Session, id=session_id, owner=request.user)
if not phone.phone:
messages.error(request, "Сначала необходимо подключить WhatsApp.")
return redirect('waweb')
if phone.line and str(phone.line.id) == str(line_id):
messages.warning(request, "Эта линия уже подключена к выбранной сессии.")
return redirect('waweb')
try:
bitrix_utils.connect_line(request, line_id, phone, connector, connector_service)
except Exception as e:
messages.error(request, str(e))
return redirect('waweb')
return redirect('waweb')
sessions = Session.objects.filter(owner=request.user)
instances = AppInstance.objects.filter(owner=request.user, app__connectors=connector)
wa_lines = Line.objects.filter(connector=connector, owner=request.user)
for session in sessions:
session.show_link = session.status == "open"
return render(
request, 'waweb/wa_sessions.html', {
"sessions": sessions,
"instances": instances,
"wa_lines": wa_lines,
}
)
Далее добавим функцию для подключения новых сессий, в которой получим доступный сервер Evolution API (если их несколько) и вызовем функцию создания сессии:
@login_required
def connect_number(request, session_id=None):
if not session_id:
sessions = Session.objects.filter(
phone__isnull=True,
owner=request.user
)
if sessions and not request.user.integrator:
messages.warning(request, "У вас уже есть незавершенное подключение. Нажмите 'Подключить'")
return redirect('waweb')
new_session = Session.objects.create(owner=request.user)
session_id = new_session.session
else:
new_session = get_object_or_404(Session, session=session_id, owner=request.user)
server = (
Server.objects.annotate(connected_sessions=Count('sessions'))
.filter(connected_sessions__lt=F('max_connections'))
.order_by('id')
.first()
)
if not server:
messages.error(request, "Нет доступных серверов.")
new_session.delete()
return redirect('waweb')
new_session.server = server
new_session.save()
try:
img_data = create_instance(new_session)
if img_data:
request.session['qr_image'] = img_data
return redirect('', session_id=session_id)
except Exception as e:
print("request error:", e)
new_session.delete()
messages.error(request, "Failed to initiate session.")
return redirect('waweb')
В функции create_instance отправим запрос в Evolution API на создание новой сессии и получим QR код:
def create_instance(session):
server = session.server
headers = {"apikey": server.api_key}
payload = {
"instanceName": str(session.session),
"qrcode": True,
"integration": "WHATSAPP-BAILEYS",
"alwaysOnline": server.always_online,
"groupsIgnore": server.groups_ignore,
"readMessages": server.read_messages,
}
try:
response = requests.post(f"{server.url}instance/create", json=payload, headers=headers, timeout=150)
inst_data = response.json()
instanceId = inst_data.get("instance", {}).get("instanceId")
session.instanceId = instanceId
session.save()
print(inst_data)
img_data = inst_data.get("qrcode", {}).get("base64", "")
print(img_data)
if "," in img_data:
img_data = img_data.split(",", 1)[1]
else:
raise Exception("QR code base64 data is invalid or missing.")
return img_data
except RequestException as e:
print("request error:", e)
return None
Для отрисовки используем qr_code_page:
@login_required
def qr_code_page(request, session_id):
qr_image = request.session.pop('qr_image', '')
try:
session = Session.objects.get(session=session_id)
except Session.DoesNotExist:
messages.error(request, "Session not found.")
return redirect('waweb')
if not qr_image:
qr_image = get_gr(request, session)
if not qr_image:
return redirect('waweb')
public_id = redis_client.get(f"public_qr:{session_id}")
if not public_id:
public_id = str(uuid.uuid4())
redis_client.set(f"public_qr:{session_id}", public_id, ex=LINK_TTL)
redis_client.set(f"public_qr:{public_id}", str(session_id), ex=LINK_TTL)
message = Message.objects.filter(code="waweb_instruction").first()
if message:
messages.info(request, message.message)
return render(request, 'waweb/qr_code.html', {
'qr_image': qr_image,
'request': request,
'public_id': public_id,
})
А также добавим функцию для декодирования QR кода:
def get_gr(request, session):
server = session.server
if not server:
messages.error(request, "Session is not attached to a server.")
return
if session.status == "open":
messages.warning(request, "Session is connected.")
return
gr_url = f"{server.url}instance/connect/{session.session}"
headers = {"apikey": server.api_key}
try:
response = requests.get(gr_url, headers=headers, timeout=60)
if response.status_code == 404:
return create_instance(session)
inst_data = response.json()
img_data = inst_data.get("base64", "")
if img_data:
qr_image = img_data.split(",", 1)[1]
return qr_image
else:
messages.error(request, f"Failed to restart session. {inst_data}")
return
except RequestException:
messages.error(request, "Failed connect to server")
return
Пропишем маршруты к нашим вьюшкам:
from django.urls import path
from . import views
urlpatterns = [
path('waweb/', views.wa_sessions, name='waweb'),
path('connect/', views.connect_number, name='connect_number'),
path('qr/<uuid:session_id>/', views.qr_code_page, name='qr_code_page'),
]
Модуль Telegram коннектора
Коннекторы WhatsApp и Telegram довольно схожи в своей реализации и базовой логике обработки сообщений. Однако ввиду политики открытого API, интеграция с телегой потребовала гораздо меньшего количество ресурсов. Как минимум, вам не потребуется знание испанского для баг-фиксов.
Создадим django приложение telegram. В models.py нам нужно лишь хранить данные о подключенном Telegram боте, а также выставить вебхук для пересылки сообщений:
import requests
from django.conf import settings
from django.db import models
from bitrix.models import AppInstance, Line
class TelegramBot(models.Model):
bot_token = models.CharField(max_length=255, unique=True, verbose_name="Bot Token")
bot_username = models.CharField(max_length=255, verbose_name="Bot Username", blank=True, null=True)
receive_messages = models.BooleanField(default=True)
send_typing_actions = models.BooleanField(default=True)
groups_ignore = models.BooleanField(default=True)
date_end = models.DateTimeField(null=True, blank=True)
owner = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True)
app_instance = models.ForeignKey(
AppInstance, on_delete=models.SET_NULL, related_name="telegrams", null=True, blank=True)
line = models.ForeignKey(
Line,
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name="telegrams",
)
def __str__(self):
return self.bot_username
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if self.bot_token and self.bot_username:
site_domain = self.app_instance.app.site.domain
webhook_url = f"https://{site_domain}/api/telegram/{self.bot_username}/"
api_url = f"https://api.telegram.org/bot{self.bot_token}/setWebhook"
response = requests.post(api_url, data={"url": webhook_url})
print(response.json())
if response.status_code != 200:
raise Exception(
f"Не удалось установить webhook: {response.text}"
)
Обработчик событий в api/views.py функционально идентичен обработчику WhatsApp, с небольшими поправками на структуру сообщений Telegram. Поэтому задерживаться на этом моменте мы не будем, просто приложу готовый код:
import logging
import bitrix.tasks as bitrix_tasks
import redis
import requests
import telegram.tasks as telegram_tasks
from django.conf import settings
from rest_framework import permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from telegram.models import TelegramBot
logger = logging.getLogger("django")
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0)
class TelegramEventsHandler(GenericViewSet):
permission_classes = [permissions.AllowAny]
def create(self, request, *args, **kwargs):
data = request.data
print(data)
bot_name = kwargs.get("bot_name")
message = data.get("message")
if not message:
return Response({"message": "No message"}, status=200)
try:
bot = TelegramBot.objects.get(bot_username=bot_name)
except TelegramBot.DoesNotExist:
return Response({'error': f'Bot {bot_name} does not exist'})
chat_id = message["chat"]["id"]
is_group = message["chat"]["type"] in ["group", "supergroup"]
message_id = message["message_id"]
from_user = message["from"]
username = from_user.get("username") or from_user.get("first_name")
name = from_user.get("first_name") or "" + from_user.get("last_name") or ""
user_full_name = name if name != "" else from_user.get("username")
pushName = username
if redis_client.exists(f"telegram:{message_id}"):
return Response({"message": "loop message"}, status=200)
redis_client.setex(f"telegram:{message_id}", 600, message_id)
try:
if bot.line:
group_title = message["chat"].get("title") if is_group else None
attachments = None
media_keys = [
"photo",
"document",
"video",
"audio",
"voice",
"video_note"
]
text = message.get("text", " ")
for key in media_keys:
if key in message:
attachments = []
if key == "photo":
items = [message[key][-1]]
else:
items = message[key] if isinstance(message[key], list) else [message[key]]
text = message.get("capture", " ")
for item in items:
file_id = item["file_id"]
tg_response = requests.get(
f"https://api.telegram.org/bot{bot.bot_token}/getFile?file_id={file_id}"
)
tg_response.raise_for_status()
file_info = tg_response.json().get("result", {})
file_path = file_info.get("file_path")
if not file_path:
continue
file_url = f"https://api.telegram.org/file/bot{bot.bot_token}/{file_path}"
filename = file_path.split("/")[-1]
attachments.append({
"url": file_url,
"name": filename,
"type": key
})
if is_group:
text_to_send = f"*{group_title}*\n{user_full_name}: {text}"
chat_key = f'bitrix_chat:telegram_group:{chat_id}'
redis_client.setex(chat_key, 60 * 60 * 24, chat_id)
else:
text_to_send = text
bitrix_tasks.send_messages(
bot.app_instance.id,
chat_id,
text_to_send,
bot.line.connector.code,
bot.line.line_id,
False,
pushName,
str(message_id),
attachments=attachments,
)
except Exception as e:
logger.error(f"Bitrix send error: {e}")
return Response({"message": "processed"}, status=200)
@action(detail=False, methods=['post'], url_path=r'(?P<session>[^/.]+)/send')
def send(self, request, session=None, *args, **kwargs):
"""
Send outgoing message to Telegram using session id
"""
session_id = session
if not session_id:
return Response({'error': 'session is required'})
try:
bot = TelegramBot.objects.get(id=session_id)
except Exception as e:
return Response({'error': 'An error occurred', 'details': str(e)})
if bot.date_end and timezone.now() > bot.date_end:
return Response({'error': 'tariff has expired'}, status=402)
data = request.data
event = data.get('event')
message_type = data.get('message_type')
attachments = data.get('attachments', {})
if event == "message_created" and message_type == "outgoing":
message_id = data.get('id')
if redis_client.exists(f'chatwoot:{message_id}'):
return Response({'message': 'loop message'})
redis_client.setex(f'chatwoot:{message_id}', 600, message_id)
content = data.get('content')
conversation = data.get('conversation', {})
meta = conversation.get('meta', {})
sender = meta.get('sender', {})
chat_id = sender.get('chat_id')
if not chat_id:
return Response({'error': 'chat_id not provided in sender meta'})
if content:
telegram_tasks.send_telegram_message_task.delay(
bot_token=bot.bot_token,
recipient=chat_id,
content=content,
cont_type="string"
)
if bot.line:
bitrix_tasks.message_add.delay(
bot.app_instance.id,
bot.line.line_id,
str(chat_id),
content,
bot.line.connector.code
)
if attachments:
for attachment in attachments:
telegram_tasks.send_telegram_message_task.delay(
bot_token=bot.bot_token,
recipient=chat_id,
content=attachment,
cont_type="media"
)
return Response({'message': 'All files sent successfully'})
return Response({'message': f'Session {session_id} authorized'})
Создадим асинхронную функцию для отправки сообщений в tasks.py, выполним проверку на тип сообщения (текст или медиа файл):
import base64
import os
import tempfile
import requests
from celery import shared_task
@shared_task
def send_telegram_message_task(bot_token, recipient, content, cont_type="string"):
"""
Отправляет сообщение или медиа в Telegram.
recipient: chat_id (str или int) или @username
content: строка (текст) или dict с base64-данными
cont_type: 'string' или 'media'
"""
print("send_telegram_message_task is called")
TELEGRAM_API_BASE = f"https://api.telegram.org/bot{bot_token}"
if cont_type == "string":
url = f"{TELEGRAM_API_BASE}/sendMessage"
payload = {
"chat_id": recipient,
"text": content,
"parse_mode": "Markdown", # или HTML
"disable_web_page_preview": False,
}
response = requests.post(url, json=payload, timeout=30)
response.raise_for_status()
return response.json()
elif cont_type == "media":
mimetype = content.get("mimetype", "")
base_type = mimetype.split("/")[0]
file_url = content.get("url") or content.get("link")
file_name = content.get("filename") or content.get("name", "file")
if not file_url:
raise ValueError("Media content must include 'link' or 'url'")
try:
file_response = requests.get(file_url, stream=True, timeout=30)
file_response.raise_for_status()
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
for chunk in file_response.iter_content(chunk_size=8192):
tmp_file.write(chunk)
tmp_file_path = tmp_file.name
files = {}
data = {"chat_id": recipient}
if base_type == "image":
url = f"{TELEGRAM_API_BASE}/sendPhoto"
files["photo"] = (file_name, open(tmp_file_path, "rb"), mimetype)
data["caption"] = ""
else:
url = f"{TELEGRAM_API_BASE}/sendDocument"
files["document"] = (file_name, open(tmp_file_path, "rb"), mimetype)
data["caption"] = ""
response = requests.post(url, data=data, files=files, timeout=60)
response.raise_for_status()
return response.json()
finally:
if os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
Не вижу смысла в реализации отдельных страниц для подключения Telegram бота, ввиду наличия "из коробки" прекрасной админ панели в django, поэтому создадим лишь страницу с отображением подключенных ботов:
import requests
import redis
import uuid
from requests.exceptions import RequestException
from django.shortcuts import render, redirect, get_object_or_404
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.db.models import Count, F
from django.utils import timezone
from django.conf import settings
from bitrix.models import AppInstance, Line, Connector
import bitrix.utils as bitrix_utils
from main.decorators import login_message_required
from .models import TelegramBot
apps = settings.INSTALLED_APPS
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0, decode_responses=True)
LINK_TTL = 60 * 60 * 24
@login_message_required(code="telegram")
def tg_bots(request):
connector_service = "telegram"
connector = Connector.objects.filter(service=connector_service).first()
if request.method == "POST":
session_id = request.POST.get("session_id")
line_id = request.POST.get("line_id")
if not line_id:
messages.warning(request, "Необходимо выбрать линию из списка или создать новую.")
return redirect('telegram')
bot = get_object_or_404(TelegramBot, id=session_id, owner=request.user)
if not bot.bot_username:
messages.error(request, "Сначала необходимо подключить WhatsApp.")
return redirect('telegram')
if bot.line and str(bot.line.id) == str(line_id):
messages.warning(request, "Эта линия уже подключена к выбранной сессии.")
return redirect('telegram')
try:
bitrix_utils.connect_line(request, line_id, bot, connector, connector_service)
except Exception as e:
messages.error(request, str(e))
return redirect('telegram')
return redirect('telegram')
bots = TelegramBot.objects.filter(owner=request.user)
instances = AppInstance.objects.filter(owner=request.user, app__connectors=connector)
tg_lines = Line.objects.filter(connector=connector, owner=request.user)
return render(
request, 'telegram/tg_sessions.html', {
"bots": bots,
"instances": instances,
"tg_lines": tg_lines,
}
)
На этом реализация нашего коннектора завершена, а в третьей части мы рассмотрим подключение Локальных приложений к Открытым линиям CRM Bitrix24.