Данная статья будет полезна тем, кто никогда прежде не экспериментировал с Raspberry, но считает, что этому самое время.

Привет, Хабр! Тенденция приписывать любому техническому устройству эпитет «умный» достигла, кажется, своего апогея (по количеству употребления, разумеется). К тому же, большинство моих знакомых не из IT-сферы до сих пор наивно полагают, что каждый уважающий себя программист живет в самом «умном» доме во всём квартале, у которого вместо стен — серверные стенды исполинских размеров, а в свободное от работы время этот же человек-программист выгуливает «умную» собаку из Boston Dynamics. Дабы не отставать от этих современных стандартов, мы с товарищем решили собственноручно создать что-то «умное», но несложное, поскольку в школе схемотехника и конструирование роботов обошли нас стороной.

Именно так нам пришла в голову идея создать полноценную систему на микросервисной архитектуре, позволяющую в любой момент времени фиксировать движения в помещении, а также записывать на камеру причину возникновения этих событий aka система охраны. Иначе говоря, мы решили, используя весь свой софтверный потенциал, реализовать полноценную интеграцию с камерой и датчиком движения.

Запланированная нами идея схематично могла быть представлена следующим образом:

У нас есть Raspberry Pi, снаряжённый датчиком детектирования движений, а также камерой для видеозаписи происходящего. Есть MQTT брокер для обеспечения передачи основной информации о возникшем событии между Raspberry и сервисом анализа данных Data Analyzer. Видеозаписи, возникающие вследствие каких-либо замеченных движений отправляются прямиком в объектное хранилище Object Storage. DB представляет собой реляционное хранилище. Завершает наш парад сервисов REST API сервер предоставляющий удобное взаимодействие с нашей системой наблюдения. Зеленым блоком выделено то, что рассматривается тут.

Пора переходить к более детальному рассмотрению каждого узла системы.

Raspberry Pi

Для разработчиков, чья профессиональная деятельность ранее никак не пересекалась с программированием каких-то железяк, мысли об одном лишь Raspberry Pi в голове могут сеять ровно те же эмоции, какие бывают у человека, первый раз увидевшим проект с юнит-тестами — самый настоящий хаос и множество переживаний (на деле беспочвенных). Подобное ощущал и я, однако желание сделать что-то с применением железа никак не давало покоя. 

Для реализации наших идей были приобретены следующие устройства: 

  • Raspberry Pi 4 с двумя гигабайтами оперативной памяти

  • SD-карта (она же жёсткий диск для Raspberry). К слову, экономить на SD-карте не стоит, поскольку скорость чтения / записи экстремально сильно влияют на производительность нашего Raspberry (проверено эмперически).

  • PIR-сенсор, контролирующий движение в помещении, а также камера для видеофиксации событий

  • переходник с microHDMI на HDMI

  • пачка проводов типа «Мама-Мама». 

  • Камера

Оригинальный блок питания заказывать мы не стали – с этим справится любое зарядное устройство от мобильного телефона 5V/1A.

Для начала работы с Raspberry необходимо установить ОС. Взять её можно тут. Я был приятно удивлен, обнаружив, что операционная система уже включает в себя IDE как для Python, так и для Java. Рассмотрим немного подробнее схему подключения камеры и датчика движения. В Raspberry есть GPIO интерфейс, предназначенный для подключения различных сенсоров. Обратите внимание, что логическая нумерация пинов (то, как видит каждый пин ОС малины) не соответствует физической.

Для подключения датчика движения необходимо воспользоваться тремя проводами типа «Мама-Мама» и подключить так, как показано на фото. Один провод подсоединяется к 5-вольтовому пину (5V на схеме) на малине и к питанию на сенсоре, следующий провод подводится к земле (маркировка GND на схеме) на малине и, соответственно, также к земле на датчике и, наконец, синий провод подключается в любой пин, с маркировкой GPIO + какой-либо номер. В нашем случае, сенсор будет передавать информацию о возникновении движения по GPIO26.

После всех манипуляций приступим к написанию python-скрипта, отвечающего за основную логику работы датчика и камеры. Выбор данного языка обусловлен обильным количеством удобных библиотек для работы с Raspberry.

Начнем с проверки работоспособности PIR-сенсора:

from gpiozero import MotionSensor
from datetime import timezone

pir = MotionSensor(26)
while True:
        pir.wait_for_motion()
        dt = datetime.datetime.utcnow()
        st = dt.strftime('%d.%m.%Y %H:%M:%S')
        print("Motion Detected at : " + st)

При использовании датчика было обнаружено, что он может реагировать на Wi-Fi сигналы, провоцируя тем самым false-positive ошибки — ложное детектирование движения. Более того, движения возникали, по мнению датчика, ровно раз в минуту. Почитав разные форумы, выяснил, что самым простым решением этой проблемы является простое экранирование фольгой:

Красиво и на белом фоне такое снять сложно.
Красиво и на белом фоне такое снять сложно.

Работа с камерой также не вызывает особых сложностей. Принципы манипуляции камерой предлагаю рассмотреть уже в совокупности с объяснением основных принципов работы скрипта. 

Для начала нам необходимо создать идентификатор нашего устройства, а учитывая распределенность нашей системы (малин с датчиком и камерой может быть неограниченное количество), автоинкремент тут не подойдет. В таком случае, используем UUID. При старте скрипта проверяем, есть ли у нас уже идентификатор устройства, считывая файл device_uuid. Если нет — создаём и записываем в файл.

import uuid

def getDeviceId():
    try:
        deviceUUIDFile  = open("device_uuid", "r")
        deviceUUID = deviceUUIDFile.read()
        print("Device UUID : " + deviceUUID)
        return deviceUUID
    except FileNotFoundError:
        print("Configuring new UUID for this device...")
        deviceUUIDFile = open("device_uuid", "w")
        deviceUUID = str(uuid.uuid4())
        print("Device UUID : " + deviceUUID)
        deviceUUIDFile.write(deviceUUID)
        return deviceUUID

Создаем подключение к MQTT брокеру:

import paho.mqtt.client as mqtt

mqttClient = mqtt.Client("P1")
mqttClient.loop_start() # необходимо для поддержки реконнектов 
mqttClient.connect(BROKER_ADDRESS)

Далее в while-true цикле проверяем наличие движения и в случае его возникновения генерируем json вида: 

{
  "device_id": "123e4567-e89b-12d3-a456-426614174000",
  "id": "133d4167-18ds-11d1-b446-826314134110",
  "place": "office_room",
  "filename": "133d4167-18ds-11d1-b446-826314134110_alarm.mp4",
  "type": "detected_motion",
  "occurred_at": "01.01.2021 20:19:56»
}

и отправляем в MQTT брокер:

MP4_VIDEO_EXT = '.mp4'

alarmUUID = str(uuid.uuid4())
        filename = '{}_alarm'.format(alarmUUID)
        message = json.dumps({
                                'device_id': deviceUUID,
                                'id': alarmUUID,
                                'place': 'office_room',
                                'filename': filename + MP4_VIDEO_EXT,
                                'type': 'detected_motion',
                                'occurred_at': st
                                }, sort_keys=True)
        mqttClient.publish("raspberry/main", message)

После отправки основной информации приступаем к записи видео. Сразу здесь продемонстрирую инициализацию камеры.

import picamera

VIDEO_TIME_SEC = 15
FILE_DIR = 'snapshots/'
MP4_VIDEO_EXT = '.mp4'
H264_VIDEO_EXT = '.h264'
camera = picamera.PiCamera()
camera.resolution = 640,480

def record(filename):
    h264_file = filename + H264_VIDEO_EXT
    print("Recording : " + h264_file)
    camera.start_recording(h264_file)
    camera.wait_recording(VIDEO_TIME_SEC)
    camera.stop_recording()
    print("Recorded")
    
    # приятнее хранить файлы в родном mp4
    mp4_file = filename + MP4_VIDEO_EXT
    command = "MP4Box -add " + h264_file + " " + mp4_file
    print("Converting from .h264 to mp4")
    
    call([command], shell=True)
    print(«Converted")

После того, как видео записано мы можем отправить его в объектное хранилище MinIO. Ниже представлен код инициализации клиента MinIO, а также процесс отправки файла. О самом MinIO мы поговорим чуть позже.

from minio import Minio
from minio.error import S3Error

MINIO_HOST = «0.0.0.0:443»
BUCKET_NAME = ‘raspberrycamera’
client = Minio(
        MINIO_HOST,
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
found = client.bucket_exists(BUCKET_NAME)
if not found:
    client.make_bucket(BUCKET_NAME)
else:
    print("Bucket {} already exists».format(BUCKET_NAME))

def sendToMinio(filename):
    try:
        print("Sending to minio")
        client.fput_object(
            BUCKET_NAME, filename, FILE_DIR + filename
        )
        print("Video has been sent")
    except Exception as e:
        print(e)
        print("Couldn't send to Minio»)

Сервис анализа данных

Переходим к следующему участку нашей системы – сервису анализа данных. Основной зоной ответственности данного сервиса является получение траффика от Rasbperry и «укладка» полученной информации в реляционную базу данных. Также данный сервис предоставляет доступ к потоковому воспроизведению накопленных видеозаписей. Для удобства разработки уложим перечисленные выше сервисы в Docker контейнеры, а разворачивать будем с помощью docker-compose:

version: '3.1'
services:
  app:
    restart: on-failure
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      POSTGRES_URL: "jdbc:postgresql://database:5432/alarms"
      POSTGRES_USER: "postgres"
      POSTGRES_PASSWORD: "changeme"

      MQTT_BROKER_HOST: "mosquitto"
      MQTT_BROKER_PORT: "1883"
      MQTT_BROKER_TOPICS: "raspberry/main"

      MINIO_HOST: "https://minio"
      MINIO_PORT: "443"
      MINIO_ACCESS_KEY: "minio"
      MINIO_SECRET_KEY: "minio123"
      MINIO_BUCKET: "raspberrycamera"
    ports:
      - "8080:8080"
    depends_on:
      - database
    links:
      - database
  database:
    container_name: database
    image: postgres
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_PASSWORD=changeme
      - POSTGRES_USER=postgres
      - POSTGRES_DB=alarms

  mosquitto:
    image: eclipse-mosquitto
    ports:
      - 1883:1883
      - 8883:8883
    restart: unless-stopped

  minio:
    image: minio/minio
    command: server --address ":443" /data
    ports:
      - "443:443"
    environment:
      MINIO_ACCESS_KEY: "minio"
      MINIO_SECRET_KEY: "minio123"
    volumes:
      - /tmp/minio/data:/data
      - /tmp/.minio:/root/.minio

MQTT-брокер

Сделаем небольшое отступление и обратим внимание на используемые инструменты.

В качестве узла связи выступает MQTT-брокер. MQTT протокол — это протокол обмена сообщениями по модели издатель-подписчик поверх протокола TCP/IP, где центральная часть протокола — MQTT-сервер или же брокер. Использование MQTT в нашем проекте было выбрано по ряду причин. Во-первых, он привлекателен своей легковесностью и компактностью, вследствие чего по сети будет перемещаться существенно меньший объем траффика, а это, в свою очередь, ведёт к снижению нагрузки как на оперативную память, так и снижает энергопотребление от альтернативного источника питания – аккумулятора (тут стоит отметить, что Raspberry может быть оснащён также аккумулятором для повышения отказоустойчивости в случае обесточивания места установки). Во-вторых, протокол изначально разрабатывался как способ поддержки связи между системами с ограниченной пропускной способностью и неопределенной стабильностью сети. Ну а тем более, чтобы не ограничивать свою фантазию в том, где можно расположить скрытую камеру – данное достоинство играет не последнюю роль, ровно как и значимость и уверенность в том, что сообщение будет гарантированно доставлено (как никак, конструируем систему охраны). В качестве MQTT-брокера был использован open-source брокер Mosquitto.

Объектное хранилище MinIO

Придерживаясь микросервисного подхода, ответственность за хранение видеозаписей хотелось бы скинуть на какой-то отдельный сервис. Что, собственно, и было сделано. Отличным решением такой задачи стало open-source хранилище MinIO. Разворачивается мгновенно, масштабируется превосходно, веб-интерфейс крайне удобен. 

Решение предлагает пользователю размещать файлы любого расширения в так называемые bucket’ы (или же корзины):

Что ж, переходим к самому анализатору данных. Сервис написан на Java с использованием Spring фреймворка. Для инициализации MQTT-слушателя добавляем в зависимости следующее:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.4.2</version>
</dependency>

Сама же конфигурация выглядит следующим образом:

@Configuration
public class MqttConfiguration {

    @Value("${mqtt.broker.host}")
    private String brokerHost;

    @Value("${mqtt.broker.port}")
    private String brokerPort;

    @Value("${mqtt.broker.topics}")
    private String topics;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        String[] parsedTopics = parseTopics();
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        "tcp://" + brokerHost + ":" + brokerPort,
                        UUID.randomUUID().toString(),
                        parsedTopics);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    private String[] parseTopics() {
        return topics.split(",");
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MqttMessageHandler();
    }
}

Обработка полученных сообщений представлена классом MqttMessageHandler:

public class MqttMessageHandler implements MessageHandler {

    @Autowired
    private AlarmRepository alarmRepository;

    @Autowired
    private DeviceRepository deviceRepository;

    private Gson gson = new GsonBuilder().create();

    private DateFormat sdf = new SimpleDateFormat("dd.MM.yyyy H:m:s");

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String payload = (String) message.getPayload();
        Map<String, String> parsedMessage = (Map<String, String>) gson.fromJson(payload, Map.class);
        long occurredAt = 0L;
        try {
            occurredAt = sdf.parse(parsedMessage.get("occurred_at")).getTime();
        } catch (ParseException e) {
            e.printStackTrace();
            return;
        }
        UUID deviceID = UUID.fromString(parsedMessage.get("device_id"));
        Device device = new Device(deviceID, "", new Date().getTime(), occurredAt);
        deviceRepository.saveAndFlush(device);

        Alarm alarm = new Alarm(
                UUID.fromString(parsedMessage.get("id")),
                parsedMessage.get("place"),
                parsedMessage.get("filename"),
                parsedMessage.get("type"),
                device,
                occurredAt,
                false
        );
        alarmRepository.saveAndFlush(alarm);
    }
}

Добавим в зависимости следующее:

<dependency>
    <groupId>io.minio</groupId>
    <artifactId>minio</artifactId>
    <version>8.0.3</version>
</dependency>

Конфигурация MinIO:

@Configuration
public class MinioConfiguration {

    @Value("${minio.host}")
    private String host;

    @Value("${minio.port}")
    private String port;

    @Value("${minio.access.key}")
    private String accessKey;

    @Value("${minio.secret.key}")
    private String secretKey;

    @Value("${minio.bucket}")
    private String bucket;

    @Bean
    public MinioClient getClient() {
        return MinioClient.builder()
                .endpoint(host, Integer.parseInt(port), false)
                .credentials(accessKey, secretKey)
                .build();
    }

    @Bean
    public MinioFileManager getManager(MinioClient client) {
        return new MinioFileManager(client);
    }
}

Изящно, не так ли? 

MinioFileManager — класс собственной реализации, основной целью которого является  облегчить жизнь при манипуляциях с файлами.

Пример самой работы с MinIO будет описан чуть ниже — для начала давайте поговорим о некоторых нюансах реализации потокового воспроизведения медиа-контента поверх HTTP протокола.

HTTP video streaming

В нашем случае будем рассматривать передачу именно видео-контента. 

Потоковое воспроизведение, в общем случае, представляет собой разбиение некоторого контента на части и непрерывную передачу кусочков данных клиенту. Величина этих самых кусочков указывается клиентом в заголовке запроса Range. К примеру, так: bytes=0-1000000. В свою очередь сервер «вырезает» указанный интервал байт из видео и выдаёт клиенту с кодом HTTP = 203 (Partial content). Помимо этого, важно учесть, что сервер должен быть способен отдать также и всё видео целиком. Существенная разница заключается в том, что код ответа должен быть равен 200. Перечислим необходимые заголовки ответа  сервера для корректного отображения видео:

  • Content-Type. Указывает возвращаемый тип данных. В нашем случае используется video/mp4

  • Accept-Ranges. Данным заголовком сервер информирует клиента о том, что тот способен выдавать контент ранжировано, а также явно указывает то, в чём желает видеть интервалы разбиения — в нашем случае в байтах: Accept-Ranges: bytes.

  • Content-Length. Не менее важный заголовок, отвечающий за результирующее количество байт передаваемого медиа-файла. В случае некорректно посчитанной длины, видео может попросту не загрузиться (особенно чувствительны к этому заголовку мобильные браузеры).

  • Content-Range. Сервер здесь явно указывает интервал байт, который будет передан клиенту, а также информирует о суммарном количестве байт в видео: Content-Range: bytes 1000-15000/250000.

Вернёмся теперь в код. Метод readFile взаимодействует с MinIO клиентом для получения массива байт. Далее на основании отправленного с клиента заголовка Range в методе slice определяется, нужно ли вырезать кусок из видео или же отдать его в том виде, в котором он был помещён в объектное хранилище.

public class MinioFileManager implements FileManager {

    @Value("${minio.bucket}")
    private String bucket;

    private final MinioClient client;

    public MinioFileManager(MinioClient mc) {
        client = mc;
    }

    public Video getVideo(String filename, VideoRange range) throws Exception {
        byte[] data = readFile(filename);
        Video video = new Video(data);
        return slice(video, range);
    }

    private Video slice(Video video, VideoRange range) {
        if (range.wholeVideo()) {
            return video;
        }
        int finalSize;
        if (video.shorterThan(range.getEnd()) || range.withNoEnd()) {
            finalSize = video.getSize() - (int) range.getStart();
        } else {
            finalSize = (int) range.difference();
        }
        byte[] result = new byte[finalSize];
        System.arraycopy(video.asArray(), (int) range.getStart(), result, 0, result.length);
        return new Video(result, false, video.getSize());
    }

    private byte[] readFile(String filename) throws Exception {
        try (InputStream is = client.getObject(
                GetObjectArgs.builder()
                        .bucket(bucket)
                        .object(filename)
                        .build())) {
            ByteArrayOutputStream bufferedOutputStream = new ByteArrayOutputStream();
            byte[] data = new byte[1024];
            int nRead;
            while ((nRead = is.read(data, 0, data.length)) != -1) {
                bufferedOutputStream.write(data, 0, nRead);
            }
            int resultLength = bufferedOutputStream.size();
            bufferedOutputStream.flush();
            byte[] result = new byte[resultLength];
            System.arraycopy(bufferedOutputStream.toByteArray(), (int) 0, result, 0, result.length);
            return result;
        }
    }

    public void removeFile(String filename) {
        List<DeleteObject> objects = new LinkedList<>();
        objects.add(new DeleteObject(filename));
        Iterable<Result<DeleteError>> results =
                client.removeObjects(
                        RemoveObjectsArgs.builder().bucket(bucket).objects(objects).build());
        try {
            for (Result<DeleteError> result : results) {
                DeleteError error = result.get();
                System.out.println(
                        "Error in deleting object " + error.objectName() + "; " + error.message());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Как мы помним, сервер также должен дополнить ответ на запрос рядом заголовков. За это отвечает класс VideoResponseFactory, который генерирует два вида респонса: либо с целым видео-контентом, либо же определенный интервал.

public class VideoResponseFactory {

    private final String contentType = "video/mp4";

    private final String CONTENT_TYPE = "Content-Type";

    private final String ACCEPT_RANGES = "Accept-Ranges";

    private final String CONTENT_LENGTH = "Content-length";

    private final String CONTENT_RANGE = "Content-Range";

    private ResponseEntity<byte[]> toPartialResponse(Video video, String stringRanges) {
        long[] ranges = parseRanges(stringRanges);
        long start = ranges[0];
        long end = ranges[1];
        long rangeEnd = end;
        if (end == -1) {
            rangeEnd = video.originalSize() - 1;
        }

        return ResponseEntity.status(HttpStatus.PARTIAL_CONTENT)
                .header(CONTENT_TYPE, contentType)
                .header(ACCEPT_RANGES, "bytes")
                .header(CONTENT_LENGTH, String.valueOf(video.getSize()))
                .header(CONTENT_RANGE, "bytes" + " " + start + "-" + rangeEnd + "/" + video.originalSize())
                .body(video.asArray());
    }

    private long[] parseRanges(String stringRanges) {
        String[] ranges = stringRanges.split("-");
        long start = Long.parseLong(ranges[0].substring(6));
        long end;
        if (ranges.length > 1) {
            end = Long.parseLong(ranges[1]);
        } else {
            end = -1;
        }
        return new long[] {start, end};
    }

    public ResponseEntity<byte[]> toResponse(Video video, String ranges) {
        if (video.isFull()) {
            return toFullResponse(video.asArray());
        } else {
            return toPartialResponse(video, ranges);
        }
    }

    private ResponseEntity<byte[]> toFullResponse(byte[] video) {
        return ResponseEntity.status(HttpStatus.OK)
                .header(CONTENT_TYPE, contentType)
                .header(CONTENT_LENGTH, String.valueOf(video.length))
                .header(ACCEPT_RANGES, "bytes")
                .body(video);
    }
}

Сам метод контроллера выглядит так:

		@GetMapping("/stream/{filename}/{filetype}")
    public Mono<ResponseEntity<byte[]>> streamVideo(@RequestHeader(value = "Range", required = false) String httpRangeList,
                                                    @PathVariable("filename") String filename,
                                                    @PathVariable("filetype") String fileType) throws Exception {
        Video video = fm.getVideo(filename, VideoRange.of(httpRangeList));
        ResponseEntity<byte[]> response = rf.toResponse(video, httpRangeList);
        Optional<Alarm> stored = repository.findAlarmByFilename(filename);
        if (stored.isPresent()) {
            Alarm alarm = stored.get();
            alarm.seen();
            repository.saveAndFlush(alarm);
        }
        return Mono.just(response);
    }

Заключение

В течение довольно короткого промежутка времени была реализована полноценная IoT-система, детектирующая нежелательные движения, а также производящая видеофиксацию событий.  TODO-лист на будущее выглядит так:

Реализация стройной системы удаленного конфигурирования подключенных в сеть устройств. Среди параметров конфигурирования: WiFi, ключи для MinIO, длительность видеозаписи, адрес и топики для брокера.

  1. Возможность просмотра видео в режиме реального времени.

На этом у меня все.

Stay tuned!