Нехитрая история о том, как мне стало стыдно постоянно просить у одногруппников пропущенную информацию и я решил чуть-чуть облегчить нам жизнь.


image


Полагаю, многим моим ровесникам знакома знакома ситуация, когда в общем чате, где довольно часто мелькает важная информация, расположилось около 30 активных собеседников, постоянно нагружающих базы данных Вконтакте своими сообщениями. При таких условиях маловероятно, что эту важную информацию увидят все. Так случается и со мной. Год назад было принято решение исправить это недоразумение.


Тех, кто готов не возмущаться по поводу очередной статьи про бота, прошу под кат.


Так как являюсь студентом первого уровня, примеры будут связаны с этой тематикой.
Итак, есть задача: сделать передачу информации от старосты к студентам удобной как для старосты, так и для студентов. Благодаря относительно новым фичам Вконтакте (а именно личные сообщения сообществ) решение бросилось в глаза сразу. Бот, сидящий в группе должен принимать сообщения от старосты (старост, если на потоке много групп) и рассылать их заинтересованным лицам (студентам).


Задача поставлена, приступаем.


Нам понадобятся:


  1. библиотека vk_api для использования Vk Api
  2. peewee orm для работы с базой данных
  3. и встроенные модули python

Также перед прочтением предлагаю освежить в памяти паттерны "Наблюдатель" (хабр, вики) и "Фасад" (хабр, вики)


Часть 1. "Приятно познакомиться, товарищ бот."


Для начала следует научить нашего бота понимать себя как сообщество. Создадим класс с названием Group. В качестве аргументов пусть принимает объект сессии и объект представителя (Proxy) базы данных.


class Group(BaseCommunicateVK):

    def __init__(self, vksession, storage):
        super().__init__(vksession)
        self.storage = storage

BaseCommunicateVK? Что там?

Решение вынести эту функциональность в отдельный класс объясняется тем, что в будущем, возможно, кто-то из вас решит дополнить бота каким-нибудь другим функционалом Вконтакте.
Ну и чтобы разгрузить абстракцию сообщества, естественно.


class BaseCommunicateVK:

    longpoll = None

    def __init__(self, vksession):
        self.session = vksession
        self.api = vksession.get_api()
        if BaseCommunicateVK.longpoll is None:
            BaseCommunicateVK.longpoll = VkLongPoll(self.session)

    def get_api(self):
        return self.api

    def get_longpoll(self):
        return self.longpoll

    def method(self, func, args):
        return self.api.method(func, args)

    @staticmethod
    def create_session(token=None, login=None, password=None, api_v='5.85'):
        try:
            if token:
                session = vk_api.VkApi(token=token, api_version=api_v)

            elif login and password:
                session = vk_api.VkApi(login, password, api_version=api_v)

            else:
                raise vk_api.AuthError("Define login and password or token.")

            return session

        except vk_api.ApiError as error:
            logging.info(error)

    def get_last_message(self, user_id):

        return self.api.messages.getHistory(
            peer_id=user_id, count=1)["items"][0]

    @staticmethod
    def get_attachments(last_message):

        if not last_message or "attachments" not in last_message:
            return ""

        attachments = last_message["attachments"]
        attach_strings = []

        for attach in attachments:

            attach_type = attach["type"]
            attach_info = attach[attach_type]

            attach_id = attach_info["id"]
            attach_owner_id = attach_info["owner_id"]

            if "access_key" in attach_info:
                access_key = attach_info["access_key"]
                attach_string = "{}{}_{}_{}".format(attach_type, attach_owner_id, attach_id, access_key)

            else:
                attach_string = "{}{}_{}".format(attach_type, attach_owner_id, attach_id)

            attach_strings.append(attach_string)

        return ",".join(attach_strings)

    @staticmethod
    def get_forwards(attachments, last_message):

        if not attachments or "fwd_count" not in attachments:
            return ""

        if len(last_message["fwd_messages"]) == int(attachments["fwd_count"]):
            return last_message["id"]

    def send(self, user_id, message, attachments=None, **kwargs):
        send_to = int(user_id)

        if "last_message" in kwargs:
            last_message = kwargs["last_message"]
        else:
            last_message = None

        p_attachments = self.get_attachments(last_message)
        p_forward = self.get_forwards(attachments, last_message)

        if message or p_attachments or p_forward:
            self.api.messages.send(
                user_id=send_to, message=message,
                attachment=p_attachments,
                forward_messages=p_forward)

        if destroy:
            accept_msg_id = self.api.messages                 .getHistory(peer_id=user_id, count=1)                 .get('items')[0].get('id')

            self.delete(accept_msg_id, destroy_type=destroy_type)

    def delete(self, msg_id, destroy_type=1):
        self.api.messages.delete(message_id=msg_id, delete_for_all=destroy_type)

Создадим метод для обновления участников сообщества. Сразу разделим их на администраторов и участников и сохраним в бд.


  • self.api настраивается при создании базового класса Group (BaseCommunicateVK)

def update_members(self):
    fields = 'domain, sex'

    admins = self.api.groups.getMembers(group_id=self.group_id, fields=fields, filter='managers')
    self.save_members(self._configure_users(admins))

    members = self.api.groups.getMembers(group_id=self.group_id, fields=fields)
    self.save_members(self._configure_users(members))

    return self

def save_members(self, members):
    self.storage.update(members)

@staticmethod
def _configure_users(items, exclude=None):

    if exclude is None:
        exclude = []

    users = []
    for user in items.get('items'):

        if user.get('id') not in exclude:
            member = User()
            member.configure(**user)
            users.append(member)

    return users

Еще этот класс должен уметь рассылать сообщения адресатам, поэтому следующий метод в студию. В параметрах: список адресатов, текст сообщения и приложения. Запускается всё это дело в отдельном потоке чтобы бот мог принимать сообщения от других участников.
Принимаются сообщения в синхронном режиме, поэтому с увеличением числа активных клиентов скорость отклика, очевидно, поубавится.


 def broadcast(self, uids, message, attachments=None, **kwargs):

    report = BroadcastReport()

    def send_all():
        users_ids = uids
        if not isinstance(users_ids, list):
            users_ids = list(users_ids)

        report.should_be_sent = len(users_ids)

        for user_id in users_ids:
            try:
                self.send(user_id, message, attachments, **kwargs)
                if message or attachments:
                    report.sent += 1

            except vk_api.VkApiError as error:
                report.errors.append('vk.com/id{}: {}'.format(user_id, error))

            except ValueError:
                continue

        for uid in self.get_member_ids(admins=True, moders=True):
            self.send(uid, str(report))

    broadcast_thread = Thread(target=send_all)
    broadcast_thread.start()
    broadcast_thread.join()

BroadcastReport - класс отчета
class BroadcastReport:

    def __init__(self):
        self.should_be_sent = 0
        self.sent = 0
        self.errors = []

    def __str__(self):
        res = "# Отчет #"
        res += "\nПлан: {} сообщений ".format(self.should_be_sent)
        res += "\nРазослано: {} ".format(self.sent)
        if self.errors:
            res += "\nОшибки:"
            for i in self.errors:
                res += "\n- {}".format(i)
        return res

На этом, вроде бы, абстракция группы закончена. Со всеми участниками сообщества познакомились, теперь надо научиться их понимать.


Часть 2. "Пш… прием.."


Заставим бота слушать все сообщения от участников нашего сообщества.
Для этого создадим класс СhatHandler, который и будет этим заниматься
В параметрах:


  • group_manager это экземпляр только что нами написанного класса сообщества
  • command_observer распознает подключенные команды (но об этом в третьей части)

class ChatHandler(Handler):

    def __init__(self, group_manager, command_observer):
        super().__init__()

        self.longpoll = group_manager.get_longpoll()
        self.group = group_manager
        self.api = group_manager.get_api()
        self.command_observer = command_observer

Дальше, собственно, слушаем сообщения от пользователей и распознаем команды.


def listen(self):
    try:
        for event in self.longpoll.listen():
            if event.user_id and event.type == VkEventType.MESSAGE_NEW and event.to_me:
                self.group.api.messages.markAsRead(peer_id=event.user_id)
                self.handle(event.user_id, event.text, event.attachments, message_id=event.message_id)

    except ConnectionError:
        logging.error("I HAVE BEEN DOWNED AT {}".format(datetime.datetime.today()))
        self.longpoll.update_longpoll_server()

def handle(self, user_id, message, attachments, **kwargs):
    member = self.group.get_member(user_id)
    self.group.update_members()
    self.command_observer.execute(member, message, attachments, self.group, **kwargs)

def run(self):
    self.listen()

Часть 3. "Что ты там написал про мою ..?"


Распознаванием команд занимается отдельная подсистема реализуемая через паттерн "Наблюдатель".
Внимание, CommandObserver:


class CommandObserver(AbstractObserver):

    def execute(self, member, message, attachments, group, **kwargs):

        for command in self.commands:

            for trigger in command.triggers:

                body = command.get_body(trigger, message)
                if body is not None:
                    group.api.messages.setActivity(user_id=member.id, type="typing")

                    if command.system:
                        kwargs.update({"trigger": trigger, "commands": self.commands})
                    else:
                        kwargs.update({"trigger": trigger})

        return command.proceed(member, body, attachments, group, **kwargs)

AbstractObserver

Опять же, вынесение сделано для будущего возможного расширения.


class AbstractObserver(metaclass=ABCMeta):

    def __init__(self):
        self.commands = []

    def add(self, *args):
        for arg in args:
            self.commands.append(arg)

    @abstractmethod
    def execute(self, *args, **kwargs):
        pass

Но что же этот наблюдатель будет распознавать?
Вот и добрались до самого интересного — команды.
Каждая команда — независимый класс, потомок базового класса Command.
Все что требуется от команды — запустить метод proceed(), если в начале сообщения пользователя найдено его ключевое слово. Ключевые слова команд определяются в переменной triggers класса команды (строка либо список строк)


class Command(metaclass=ABCMeta):

    def __init__(self):
        self.triggers = []
        self.description = "Empty description."

        self.system = False
        self.privilege = False

        self.activate_times = []
        self.activate_days = set()
        self.autostart_func = self.proceed

    def proceed(self, member, message, attachments, group, **kwargs):
        raise NotImplementedError()

    @staticmethod
    def get_body(kw, message):
        if not isinstance(kw, list): kw = [kw, ]

        for i in kw:
            reg = '^ *(\\{}) *'.format(i)

            if re.search(reg, message):
                return re.sub(reg, '', message).strip(' ')

Как видно из сигнатуры метода proceed(), каждая команда получает на вход ссылку на экземпляр участника группы, его сообщение (уже без ключевого слова), приложения и ссылку на экземпляр группы. То есть, всё взаимодействие с участником группы ложится на плечи команды. Я считаю это наиболее верным решением, так как таким образом возможно создание шелла (Shell) для большей интерактивности.
(По правде говоря, для этого нужно будет либо вносить асинхронщину, потому что обработка проходит синхронно, либо каждое полученное сообщение обрабатывать в новом потоке, что отнюдь не выгодно)


Примеры реализации команд:


BroadcastCommand
class BroadcastCommand(Command):

    def __init__(self):
        super().__init__()
        self.triggers = ['.mb']

        self.privilege = True
        self.description = "Рассылка сообщения всем участникам сообщества."

    def proceed(self, member, message, attachments, group, **kwargs):
        if member.id not in group.get_member_ids(admins=True, editors=True):
            group.send(member.id, "You cannot do this ^_^")
            return True

        last_message = group.get_last_message(member.id)
        group.broadcast(group.get_member_ids(), message, attachments, last_message=last_message, **kwargs)

        return True

HelpCommand
class HelpCommand(Command):

    def __init__(self):
        super().__init__()
        self.commands = []
        self.triggers = ['.h', '.help']

        self.system = True
        self.description = "Показ этого сообщения."

    def proceed(self, member, message, attachments, group, **kwargs):
        commands = kwargs["commands"]
        help = "Реализованы следующие команды:\n\n"
        admins = group.get_member_ids(admins=True, moders=True)
        i = 0
        for command in commands:
            if command.privilege and member.id not in admins:
                continue
            help += "{}) {}\n\n".format(i + 1, command.name())
            i += 1
        group.send(member.id, help)
        return True

Часть 4. "Мы же одна большая команда."


Теперь все эти модули и обработчики нужно объединить и настроить.
Еще один класс, пожалуйста!
Создаем фасад, который настроит нашего бота.


class VKManage:

    def __init__(self, token=None, login=None, password=None):
        self.session = BaseCommunicateVK.create_session(token, login, password, api_version)

        self.storage = DBProxy(DatabaseORM)
        self.group = Group(self.session, self.storage).setup().update_members()
        self.chat = ChatHandler(self.group, CommandObserver.get_observer())

    def start(self):
        self.chat.run()

    def get_command(self, command_name):
        return {
            "рассылка участникам": BroadcastCommand(),
            "рассылка админам": AdminBroadcastCommand(),
            "помощь": HelpCommand(),
            "учет прогулов": SkippedLectionsCommand(),
            "расписание": TopicTimetableCommand().setup_account(self.bot.api),
        }.get(command_name)

    def connect_command(self, command_name):
        command = self.get_command(str(command_name).lower())
        if command:
            self.chat.command_observer.add(command)
        return self

    def connect_commands(self, command_names):
        for i in command_names.split(','): self.connect_command(i.strip())
        return self

Последний этап — запуск. Всегда самый противный, потому что может вылезти какая-нибудь неожиданность. Не в этот раз.


  • ConfigParser импортируется из core.settings.ConfigParser. По сути просто читает конфиг.
  • project_path импортируется из модуля settings в корне проекта.


    if __name__ == '__main__':
    config = ConfigParser(project_path)
    
    VKManage(token=config['token'], login=config['login'], password=config['password'])    .connect_commands("помощь, рассылка участникам, рассылка админам, учет прогулов")    .start()


На этом, кажется, всё.


На данный момент эта программа принесла пользу, как минимум, трем группам и, надеюсь, вам тоже принесет.


Развернуть можно бесплатно на Heroku, но это уже другая история.


Ссылки:


Комментарии (0)