Пятничный рабочий день на удалёнке уже подходил к концу, как в дверь постучали, чтобы сообщить об установке нового домофона. Узнав, что новый домофон имеет мобильное приложение, позволяющее отвечать на звонки не находясь дома, я заинтересовался и сразу же загрузил его на свой телефон. Залогинившись, я обнаружил интересную особенность этого приложения — даже без активного вызова в мою квартиру я мог смотреть в камеру домофона и открывать дверь в произвольный момент времени. "Да это же онлайн АРI к двери подъезда!" — щёлкнуло в голове. Судьба предстоящих выходных была предрешена.

Видеодемонстрация в конце статьи.

Кадр из фильма «Пятый Элемент»
Кадр из фильма «Пятый Элемент»

Дисклеймер

Используемая технология аутентификации на основании распознавания лиц не учитывает множества сценариев атак и недостаточна для обеспечения безопасности. Описанная система анализа кадров была активна только в моменты, когда кому-то из моей семьи было необходимо попасть в подъезд — она не совершала автоматическую обработку лиц других людей и не могла привести к допуску в подъезд третьих лиц.

Получение API

Чтобы автоматизировать управление дверью, необходимо понять, куда и в каком формате отправляет запросы само приложение. Реверс-инжиниринг — дело неоднозначное, поэтому я попытался обойтись без него и вместо этого просто перехватить свой собственный трафик. Для этой задачи я взял НТТР Тооlkit — комплекс программ, который позволяет наладить прослушивание http(s) запросов собственного Android устройства.

Первая попытка оказывается провальной — после установки на телефон Android-части инструментария и сгенерированного Certificate authority оказалось, что мобильное приложение домофона не доверяет пользовательским СА. Согласно документации, начиная с Android 7 манифест приложения должен явно изъявлять такое желание.

Так как мой телефон не поддерживает root доступ для модификации списка системных СА, я воспользовался официальным эмулятором Android, идущим в комплекте с Android Studio. После запуска эмулятора и перехвата с помощью ADB меня встретило радостное сообщение о том, что трафик от всех приложений без Certificate pinning будет успешно расшифрован.

Успешное соединение с HTTP Toolkit
Успешное соединение с HTTP Toolkit

К счастью, приложение оказалось как раз из таких — немного побродив по приложению и открыв дверь, можно переходить к анализу логов.

Запрос открытия двери
Запрос открытия двери

Всего интересными показалось три запроса:

  1. Запрос на открытие двери подъезда: POST по адресу /rest/v1/places/{place_id}/accesscontrols/{control_id}/actions с JSON-телом {"name": "accessControlOpen"}

  2. Получение снимка (превью) с камеры: GET по адресу /rest/v1/places/{place_id}/accesscontrols/{control_id}/videosnapshots

  3. Получение ссылки на видеопоток с аудио: GET по адресу /rest/v1/forpost/cameras/{camera_id}/video?LightStream=0

HTTP заголовки всех трёх запросов содержат ключ Authorization — судя по всему, именно по нему происходит авторизация при выполнении запросов. Сделав пару запросов руками через Advanced REST Client, чтобы убедиться, что заголовка Authorization достаточно и в самостоятельной работе с API не осталось подводных камней, я понял, что можно откладывать эмулятор и переходить к написанию кода.

Вооружившись Python и requests, я написал необходимые для следующих этапов функции исполнения каждого из трёх действий:

HEADERS = {"Authorization": "Bearer ###"}
ACTION_URL = "https://###.ru/rest/v1/places/###/accesscontrols/###/"
VIDEO_URL = "https://###.ru/rest/v1/forpost/cameras/###/video?LightStream=0"

def get_image():
    result = requests.get(f'{ACTION_URL}/videosnapshots', headers=HEADERS)
    if result.status_code != 200:
        logging.error(f"Failed to get an image with status code {result.status_code}")
        return None
    logging.warning(f"Image received successfully in {result.elapsed.total_seconds()}sec")
    return result.content

def open_door():
    result = requests.post(
        f'{ACTION_URL}/actions', headers=HEADERS, json={"name": "accessControlOpen"})
    if result.status_code != 200:
        logging.error(f"Failed to open the door with status code {result.status_code}")
        return False
    logging.warning(f"Door opened successfully in {result.elapsed.total_seconds()}sec")
    return True

def get_videostream_link():
    result = requests.get(VIDEO_URL, headers=HEADERS)
    if result.status_code != 200:
        logging.error(f"Failed to get stream link with status code {result.status_code}")
        return False
    logging.warning(f"Stream link received successfully in {result.elapsed.total_seconds()}sec")
    return result.json()['data']['URL']

Поиск знакомых лиц в кадре

Прежде чем начать этот раздел, нужно рассказать пару слов об уже имеющихся на тот момент у меня в распоряжении серверных мощностях — это недорогая виртуальная машина с доступом ко всего одному потоку Intel(R) Xeon(R) CPU E5-2650L v3 @ 1.80GHz, 1GB оперативной памяти и 0 GPU. Тратиться на более дорогую конфигурацию не хотелось, а значит, нужно было попробовать выжать максимум из имеющихся ресурсов.

На данном этапе проектом заинтересовалась моя жена, хорошо разбирающаяся в машинном обучении и в итоге взявшая на себя эту задачу. Для этой работы был выбран OpenVINO Toolkit — инструментарий от Intel, в том числе заточенный как раз на запуск моделей машинного обучения на CPU.

Непродолжительный поиск существующих решений привёл на страницу Interactive Face Recognition Demo — официального демо, показывающего ровно необходимый функционал сравнения видимых в кадре лиц с базой заранее сохранённых. Единственная проблема состояла в том, что данный пример по каким-то причинам исчез после релиза 2020.3, а удобная установка пакета через pip у проекта появилась только с 2021.1. Было решено установить последнюю версию OpenVINO и адаптировать код под неё.

К счастью, гит помнит всё и получить из репозитория проекта код демо и нужные модели не составило труда. После удаления всей работы с командной строкой (взяв для всего значения по умолчанию), визуализацией и видеопотоком вебкамеры, был получен класс, способный распознавать лица в конкретном кадре:

class ImageProcessor:
    def __init__(self):
        self.frame_processor = FrameProcessor()

    def process(self, image):
        detections = self.frame_processor.process(image)
        labels = []
        for roi, landmarks, identity in zip(*detections):
            label = self.frame_processor.face_identifier.get_identity_label(
                identity.id)
            labels.append(label)
        return labels

После создания базы лиц из десятка селфи можно было переходить к тестированию фактической производительности полученного решения на имеющемся железе. В качестве подопытных картинок я взял фотографию себя и фото пустой улицы, сделанные на домофонную камеру функцией get_image() выше.

100 runs on an image with known face:
Total time: 7.356s
Time per frame: 0.007s
FPS: 135.944

100 runs on an image without faces:
Total time: 2.985s
Time per frame: 0.003s
FPS: 334.962

Очевидно, что показанная производительность с запасом покрывает нужды детектирования лиц в реальном времени.

1 FPS: Работа со снимками с камеры

Итак, на этом этапе у меня уже был класс для поиска заданных лиц в кадре, а также функции получения снимка с камеры и ссылки на видеопоток. С видео на тот момент я никогда не работал, потому для MVP решил переложить работу по выделению кадров на сервер и снова воспользоваться get_image().

class ImageProcessor:
		# <...>
    def process_single_image(self, image):
        nparr = np.fromstring(image, np.uint8)
        img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        labels = self.process(img_np)
        return labels

def snapshot_based_intercom_id():
    processor = ImageProcessor()

    last_open_door_time = time.time()
    while True:
        start_time = time.time()
        image = get_image()
        result = processor.process_single_image(image)
        logging.info(f'{result} in {time.time() - start_time}s')
        # Successfull detections are "face{N}"
        if any(['face' in res for res in result]):
            if start_time - last_open_door_time > 5:
                open_door()
                with open(f'images/{start_time}_OK.jfif', 'wb') as f:
                    f.write(image)
                last_open_door_time = start_time

Цикл получает картинки от домофона, ищет на них знакомые лица, открывает дверь в случае успешной детекции и сохраняет фото на память. Важно было не забыть ограничить частоту отправки команды открытия двери, т.к. лицо обычно распознаётся ещё на подходах к двери.

Момент успешного распознавания, версия с обработкой отдельных снимков
Момент успешного распознавания, версия с обработкой отдельных снимков

Заработало! Полностью довольный первым запуском, я вернулся в квартиру. Единственное, что портило впечатление от системы распознавания — время реакции на появление лица в кадре, т.к. время отклика API оставляло желать лучшего. Низкая частота поступления данных, 0.7с на получение картинки и 0.6с на открытие двери, давали видимый невооружённым взглядом лаг.

До 30 FPS: Обработка видеопотока

Получить кадры из видео оказалось на удивление просто:

vcap = cv2.VideoCapture(link)
success, frame = vcap.read()

Замеры показали, что камера домофона способна выдавать стабильные 30 FPS. Проблемой оказалось поддержание меньшей частоты кадров: метод read() возвращает последний необработанный кадр из внутренней очереди кадров. Если эта очередь наполняется видеопотоком быстрее, чем вычитывается, то обрабатываемые кадры начинают отставать от реального времени, что приводит к нежелательной задержке. Дополнительно, с практической точки зрения, распознавать лица 30 раз в секунду — пустая трата вычислительных ресурсов, так как обычно люди подходят к двери подъезда с небольшой скоростью.

Первым потенциальным решением было установить размер внутренней очереди: vcap.set(CV_CAP_PROP_BUFFERSIZE, 0);. Согласно найденной информации, такой трюк должен был хорошо работать с любой конфигурацией системы для версий OpenCV выше 3.4, но по какой-то причине, так и не оказал никакого влияния в моём случае. Единственной рабочей альтернативой стал подход, описанный в этом ответе со StackOverflow — завести отдельный поток, читающий кадры из камеры на максимально возможной скорости и сохраняющий последний в поле класса для дальнейшего доступа (впоследствии оказалось, что именно этот цикл ответственен за большую часть потребления процессора).

Получилась модификация ImageProcessor для обработки видеопотока с частотой 3 кадра в секунду:

class CameraBufferCleanerThread(threading.Thread):
    def __init__(self, camera, name='camera-buffer-cleaner-thread'):
        self.camera = camera
        self.last_frame = None
        self.finished = False
        super(CameraBufferCleanerThread, self).__init__(name=name)
        self.start()

    def run(self):
        while not self.finished:
            ret, self.last_frame = self.camera.read()

    def __enter__(self): return self

    def __exit__(self, type, value, traceback):
        self.finished = True
        self.join()

class ImageProcessor:
		# <...>
    def process_stream(self, link):
        vcap = cv2.VideoCapture(link)
        interval = 0.3 # ~3 FPS
        with CameraBufferCleanerThread(vcap) as cam_cleaner:
            while True:
                frame = cam_cleaner.last_frame
                if frame is not None:
                    yield (self.process(frame), frame)
                else:
                    yield (None, None)
                time.sleep(interval)

И соответствующая модификация snapshot_based_intercom_id:

def stream_based_intercom_id():
    processor = ImageProcessor()

    link = get_videostream_link()
    # To notify about delays
    last_time = time.time()
    last_open_door_time = time.time()
    for result, np_image in processor.process_stream(link):
        current_time = time.time()
        delta_time = current_time - last_time
        if delta_time < 1:
            logging.info(f'{result} in {delta_time}')
        else:
            logging.warning(f'{result} in {delta_time}')
        last_time = current_time
        if result is None:
            continue
        if any(['face' in res for res in result]):
            if current_time - last_open_door_time > 5:
                logging.warning(
                  	f'Hey, I know you - {result[0]}! Opening the door...')
                last_open_door_time = current_time
                open_door()
                cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)

Тест в реальных условиях показал ощутимое падение времени отклика — при хорошем освещении подъездная дверь оказывалась открытой ещё до того, как я успевал подойти к ней вплотную.

Момент успешного распознавания, версия с обработкой видеопотока
Момент успешного распознавания, версия с обработкой видеопотока

Управление с помощью Telegram бота

Сама система доказала свою работоспособность и теперь хотелось создать для неё удобный интерфейс для включения/выключения. Телеграм бот показался отличным вариантом решения для этой задачи.

С помощью пакета python-telegram-bot была написана простая оболочка, принимающая в себя callback включения/выключения системы и список доверенных никнеймов.

class TelegramInterface:
    def __init__(self, login_whitelist, state_callback):
        self.state_callback = state_callback
        self.login_whitelist = login_whitelist
        self.updater = Updater(
            token = "###", use_context = True)
        self.run()

    def run(self):
        dispatcher = self.updater.dispatcher
        dispatcher.add_handler(CommandHandler("start", self.start))
        dispatcher.add_handler(CommandHandler("run", self.run_intercom))
        dispatcher.add_handler(CommandHandler("stop", self.stop_intercom))

        self.updater.start_polling()

    def run_intercom(self, update: Update, context: CallbackContext):
        user = update.message.from_user
        update.message.reply_text(
            self.state_callback(True) if user.username in self.login_whitelist else 'not allowed',
            reply_to_message_id=update.message.message_id)

    def stop_intercom(self, update: Update, context: CallbackContext):
        user = update.message.from_user
        update.message.reply_text(
            self.state_callback(False) if user.username in self.login_whitelist else 'not allowed',
            reply_to_message_id=update.message.message_id)

    def start(self, update: Update, context: CallbackContext) -> None:
        update.message.reply_text('Hi!')
        
        
class TelegramBotThreadWrapper(threading.Thread):
    def __init__(self, state_callback, name='telegram-bot-wrapper'):
        self.whitelist = ["###", "###"]
        self.state_callback = state_callback
        super(TelegramBotThreadWrapper, self).__init__(name=name)
        self.start()

    def run(self):
        self.bot = TelegramInterface(self.whitelist, self.state_callback)

И следующая ступень эволюции функции intercom_id, обрабатывающая синхронизацию между потоками обработки данных и получения команд от бота:

def stream_based_intercom_id_with_telegram():
    processor = ImageProcessor()

    loop_state_lock = threading.Lock()

    loop_should_run = False
    loop_should_change_state_cv = threading.Condition(loop_state_lock)

    is_loop_finished = True
    loop_changed_state_cv = threading.Condition(loop_state_lock)

    def stream_processing_loop():
        nonlocal loop_should_run
        nonlocal loop_should_change_state_cv
        nonlocal is_loop_finished
        nonlocal loop_changed_state_cv

        while True:
            with loop_should_change_state_cv:
                loop_should_change_state_cv.wait_for(lambda: loop_should_run)
                is_loop_finished = False
                loop_changed_state_cv.notify_all()
                logging.warning(f'Loop is started')
            link = get_videostream_link()
            last_time = time.time()
            last_open_door_time = time.time()
            for result, np_image in processor.process_stream(link):
                with loop_should_change_state_cv:
                    if not loop_should_run:
                        is_loop_finished = True
                        loop_changed_state_cv.notify_all()
                        logging.warning(f'Loop is stopped')
                        break
                current_time = time.time()
                delta_time = current_time - last_time
                if delta_time < 1:
                    logging.info(f'{result} in {delta_time}')
                else:
                    logging.warning(f'{result} in {delta_time}')
                last_time = current_time
                if result is None:
                    continue
                if any(['face' in res for res in result]):
                    if current_time - last_open_door_time > 5:
                        logging.warning(f'Hey, I know you - {result[0]}! Opening the door...')
                        last_open_door_time = current_time
                        open_door()
                        cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)

    def state_callback(is_running):
        nonlocal loop_should_run
        nonlocal loop_should_change_state_cv
        nonlocal is_loop_finished
        nonlocal loop_changed_state_cv

        with loop_should_change_state_cv:
            if is_running == loop_should_run:
                return "Intercom service state is not changed"
            loop_should_run = is_running
            if loop_should_run:
                loop_should_change_state_cv.notify_all()
                loop_changed_state_cv.wait_for(lambda: not is_loop_finished)
                return "Intercom service is up"
            else:
                loop_changed_state_cv.wait_for(lambda: is_loop_finished)
                return "Intercom service is down"

    telegram_bot = TelegramBotThreadWrapper(state_callback)
    logging.warning("Bot is ready")
    stream_processing_loop()

Результат

Видео:

Послесловие

Несмотря на возможности, которые технология умных домофонов несёт жильцам, объединённые в единую сеть сотни (тысячи?) подъездных дверей с камерами и микрофонами (да, в произвольно получаемом видеопотоке есть и аудио!), ведущими непрерывную запись — как по мне, скорее пугающее явление, открывающее новые возможности для нарушения приватности.

Я бы предпочёл, чтобы доступ к видеопотоку предоставлялся только в момент звонка в квартиру и ведущаяся трёхдневная запись, позиционирующаяся как средство раскрытия правонарушений, хранилась не на серверах компании, а непосредственно в домофоне, с возможностью доступа к ней по запросу. Или не велась вовсе.