Данная статья будет полезна тем, кто никогда прежде не экспериментировал с Raspberry, но считает, что этому самое время.
Привет, Хабр! Тенденция приписывать любому техническому устройству эпитет «умный» достигла, кажется, своего апогея (по количеству употребления, разумеется). К тому же, большинство моих знакомых не из IT-сферы до сих пор наивно полагают, что каждый уважающий себя программист живет в самом «умном» доме во всём квартале, у которого вместо стен — серверные стенды исполинских размеров, а в свободное от работы время этот же человек-программист выгуливает «умную» собаку из Boston Dynamics. Дабы не отставать от этих современных стандартов, мы с товарищем решили собственноручно создать что-то «умное», но несложное, поскольку в школе схемотехника и конструирование роботов обошли нас стороной.
Именно так нам пришла в голову идея создать полноценную систему на микросервисной архитектуре, позволяющую в любой момент времени фиксировать движения в помещении, а также записывать на камеру причину возникновения этих событий aka система охраны. Иначе говоря, мы решили, используя весь свой софтверный потенциал, реализовать полноценную интеграцию с камерой и датчиком движения.
Запланированная нами идея схематично могла быть представлена следующим образом:
![](https://habrastorage.org/getpro/habr/upload_files/d08/dd4/172/d08dd41722cccfb571431fbb9149a7e1.jpg)
У нас есть Raspberry Pi, снаряжённый датчиком детектирования движений, а также камерой для видеозаписи происходящего. Есть MQTT брокер для обеспечения передачи основной информации о возникшем событии между Raspberry и сервисом анализа данных Data Analyzer. Видеозаписи, возникающие вследствие каких-либо замеченных движений отправляются прямиком в объектное хранилище Object Storage. DB представляет собой реляционное хранилище. Завершает наш парад сервисов REST API сервер предоставляющий удобное взаимодействие с нашей системой наблюдения. Зеленым блоком выделено то, что рассматривается тут.
Пора переходить к более детальному рассмотрению каждого узла системы.
Raspberry Pi
Для разработчиков, чья профессиональная деятельность ранее никак не пересекалась с программированием каких-то железяк, мысли об одном лишь Raspberry Pi в голове могут сеять ровно те же эмоции, какие бывают у человека, первый раз увидевшим проект с юнит-тестами — самый настоящий хаос и множество переживаний (на деле беспочвенных). Подобное ощущал и я, однако желание сделать что-то с применением железа никак не давало покоя.
Для реализации наших идей были приобретены следующие устройства:
Raspberry Pi 4 с двумя гигабайтами оперативной памяти
SD-карта (она же жёсткий диск для Raspberry). К слову, экономить на SD-карте не стоит, поскольку скорость чтения / записи экстремально сильно влияют на производительность нашего Raspberry (проверено эмперически).
PIR-сенсор, контролирующий движение в помещении, а также камера для видеофиксации событий
переходник с microHDMI на HDMI
пачка проводов типа «Мама-Мама».
Камера
![](https://habrastorage.org/getpro/habr/upload_files/635/e4b/75a/635e4b75a1ced647f83fa7f53965c17f.jpg)
Оригинальный блок питания заказывать мы не стали – с этим справится любое зарядное устройство от мобильного телефона 5V/1A.
Для начала работы с Raspberry необходимо установить ОС. Взять её можно тут. Я был приятно удивлен, обнаружив, что операционная система уже включает в себя IDE как для Python, так и для Java. Рассмотрим немного подробнее схему подключения камеры и датчика движения. В Raspberry есть GPIO интерфейс, предназначенный для подключения различных сенсоров. Обратите внимание, что логическая нумерация пинов (то, как видит каждый пин ОС малины) не соответствует физической.
![](https://habrastorage.org/getpro/habr/upload_files/e21/3e3/c5a/e213e3c5ac9358214854fac7bf053e85.jpg)
Для подключения датчика движения необходимо воспользоваться тремя проводами типа «Мама-Мама» и подключить так, как показано на фото. Один провод подсоединяется к 5-вольтовому пину (5V на схеме) на малине и к питанию на сенсоре, следующий провод подводится к земле (маркировка GND на схеме) на малине и, соответственно, также к земле на датчике и, наконец, синий провод подключается в любой пин, с маркировкой GPIO + какой-либо номер. В нашем случае, сенсор будет передавать информацию о возникновении движения по GPIO26.
![](https://habrastorage.org/getpro/habr/upload_files/fb5/530/426/fb55304262b725d5595e61edf077d414.jpg)
После всех манипуляций приступим к написанию python-скрипта, отвечающего за основную логику работы датчика и камеры. Выбор данного языка обусловлен обильным количеством удобных библиотек для работы с Raspberry.
Начнем с проверки работоспособности PIR-сенсора:
from gpiozero import MotionSensor
from datetime import timezone
pir = MotionSensor(26)
while True:
pir.wait_for_motion()
dt = datetime.datetime.utcnow()
st = dt.strftime('%d.%m.%Y %H:%M:%S')
print("Motion Detected at : " + st)
При использовании датчика было обнаружено, что он может реагировать на Wi-Fi сигналы, провоцируя тем самым false-positive ошибки — ложное детектирование движения. Более того, движения возникали, по мнению датчика, ровно раз в минуту. Почитав разные форумы, выяснил, что самым простым решением этой проблемы является простое экранирование фольгой:
![Красиво и на белом фоне такое снять сложно. Красиво и на белом фоне такое снять сложно.](https://habrastorage.org/getpro/habr/upload_files/24d/253/a55/24d253a5536abe46a1e0312f2aa41fa8.jpg)
Работа с камерой также не вызывает особых сложностей. Принципы манипуляции камерой предлагаю рассмотреть уже в совокупности с объяснением основных принципов работы скрипта.
Для начала нам необходимо создать идентификатор нашего устройства, а учитывая распределенность нашей системы (малин с датчиком и камерой может быть неограниченное количество), автоинкремент тут не подойдет. В таком случае, используем UUID. При старте скрипта проверяем, есть ли у нас уже идентификатор устройства, считывая файл device_uuid. Если нет — создаём и записываем в файл.
import uuid
def getDeviceId():
try:
deviceUUIDFile = open("device_uuid", "r")
deviceUUID = deviceUUIDFile.read()
print("Device UUID : " + deviceUUID)
return deviceUUID
except FileNotFoundError:
print("Configuring new UUID for this device...")
deviceUUIDFile = open("device_uuid", "w")
deviceUUID = str(uuid.uuid4())
print("Device UUID : " + deviceUUID)
deviceUUIDFile.write(deviceUUID)
return deviceUUID
Создаем подключение к MQTT брокеру:
import paho.mqtt.client as mqtt
mqttClient = mqtt.Client("P1")
mqttClient.loop_start() # необходимо для поддержки реконнектов
mqttClient.connect(BROKER_ADDRESS)
Далее в while-true цикле проверяем наличие движения и в случае его возникновения генерируем json вида:
{
"device_id": "123e4567-e89b-12d3-a456-426614174000",
"id": "133d4167-18ds-11d1-b446-826314134110",
"place": "office_room",
"filename": "133d4167-18ds-11d1-b446-826314134110_alarm.mp4",
"type": "detected_motion",
"occurred_at": "01.01.2021 20:19:56»
}
и отправляем в MQTT брокер:
MP4_VIDEO_EXT = '.mp4'
alarmUUID = str(uuid.uuid4())
filename = '{}_alarm'.format(alarmUUID)
message = json.dumps({
'device_id': deviceUUID,
'id': alarmUUID,
'place': 'office_room',
'filename': filename + MP4_VIDEO_EXT,
'type': 'detected_motion',
'occurred_at': st
}, sort_keys=True)
mqttClient.publish("raspberry/main", message)
После отправки основной информации приступаем к записи видео. Сразу здесь продемонстрирую инициализацию камеры.
import picamera
VIDEO_TIME_SEC = 15
FILE_DIR = 'snapshots/'
MP4_VIDEO_EXT = '.mp4'
H264_VIDEO_EXT = '.h264'
camera = picamera.PiCamera()
camera.resolution = 640,480
def record(filename):
h264_file = filename + H264_VIDEO_EXT
print("Recording : " + h264_file)
camera.start_recording(h264_file)
camera.wait_recording(VIDEO_TIME_SEC)
camera.stop_recording()
print("Recorded")
# приятнее хранить файлы в родном mp4
mp4_file = filename + MP4_VIDEO_EXT
command = "MP4Box -add " + h264_file + " " + mp4_file
print("Converting from .h264 to mp4")
call([command], shell=True)
print(«Converted")
После того, как видео записано мы можем отправить его в объектное хранилище MinIO. Ниже представлен код инициализации клиента MinIO, а также процесс отправки файла. О самом MinIO мы поговорим чуть позже.
from minio import Minio
from minio.error import S3Error
MINIO_HOST = «0.0.0.0:443»
BUCKET_NAME = ‘raspberrycamera’
client = Minio(
MINIO_HOST,
access_key="minio",
secret_key="minio123",
secure=False
)
found = client.bucket_exists(BUCKET_NAME)
if not found:
client.make_bucket(BUCKET_NAME)
else:
print("Bucket {} already exists».format(BUCKET_NAME))
def sendToMinio(filename):
try:
print("Sending to minio")
client.fput_object(
BUCKET_NAME, filename, FILE_DIR + filename
)
print("Video has been sent")
except Exception as e:
print(e)
print("Couldn't send to Minio»)
Сервис анализа данных
Переходим к следующему участку нашей системы – сервису анализа данных. Основной зоной ответственности данного сервиса является получение траффика от Rasbperry и «укладка» полученной информации в реляционную базу данных. Также данный сервис предоставляет доступ к потоковому воспроизведению накопленных видеозаписей. Для удобства разработки уложим перечисленные выше сервисы в Docker контейнеры, а разворачивать будем с помощью docker-compose:
version: '3.1'
services:
app:
restart: on-failure
build:
context: .
dockerfile: Dockerfile
environment:
POSTGRES_URL: "jdbc:postgresql://database:5432/alarms"
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "changeme"
MQTT_BROKER_HOST: "mosquitto"
MQTT_BROKER_PORT: "1883"
MQTT_BROKER_TOPICS: "raspberry/main"
MINIO_HOST: "https://minio"
MINIO_PORT: "443"
MINIO_ACCESS_KEY: "minio"
MINIO_SECRET_KEY: "minio123"
MINIO_BUCKET: "raspberrycamera"
ports:
- "8080:8080"
depends_on:
- database
links:
- database
database:
container_name: database
image: postgres
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=changeme
- POSTGRES_USER=postgres
- POSTGRES_DB=alarms
mosquitto:
image: eclipse-mosquitto
ports:
- 1883:1883
- 8883:8883
restart: unless-stopped
minio:
image: minio/minio
command: server --address ":443" /data
ports:
- "443:443"
environment:
MINIO_ACCESS_KEY: "minio"
MINIO_SECRET_KEY: "minio123"
volumes:
- /tmp/minio/data:/data
- /tmp/.minio:/root/.minio
MQTT-брокер
Сделаем небольшое отступление и обратим внимание на используемые инструменты.
В качестве узла связи выступает MQTT-брокер. MQTT протокол — это протокол обмена сообщениями по модели издатель-подписчик поверх протокола TCP/IP, где центральная часть протокола — MQTT-сервер или же брокер. Использование MQTT в нашем проекте было выбрано по ряду причин. Во-первых, он привлекателен своей легковесностью и компактностью, вследствие чего по сети будет перемещаться существенно меньший объем траффика, а это, в свою очередь, ведёт к снижению нагрузки как на оперативную память, так и снижает энергопотребление от альтернативного источника питания – аккумулятора (тут стоит отметить, что Raspberry может быть оснащён также аккумулятором для повышения отказоустойчивости в случае обесточивания места установки). Во-вторых, протокол изначально разрабатывался как способ поддержки связи между системами с ограниченной пропускной способностью и неопределенной стабильностью сети. Ну а тем более, чтобы не ограничивать свою фантазию в том, где можно расположить скрытую камеру – данное достоинство играет не последнюю роль, ровно как и значимость и уверенность в том, что сообщение будет гарантированно доставлено (как никак, конструируем систему охраны). В качестве MQTT-брокера был использован open-source брокер Mosquitto.
Объектное хранилище MinIO
Придерживаясь микросервисного подхода, ответственность за хранение видеозаписей хотелось бы скинуть на какой-то отдельный сервис. Что, собственно, и было сделано. Отличным решением такой задачи стало open-source хранилище MinIO. Разворачивается мгновенно, масштабируется превосходно, веб-интерфейс крайне удобен.
Решение предлагает пользователю размещать файлы любого расширения в так называемые bucket’ы (или же корзины):
![](https://habrastorage.org/getpro/habr/upload_files/7e1/88a/fa9/7e188afa9ebaf3ff35527dd83d799986.jpg)
Что ж, переходим к самому анализатору данных. Сервис написан на Java с использованием Spring фреймворка. Для инициализации MQTT-слушателя добавляем в зависимости следующее:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.2</version>
</dependency>
Сама же конфигурация выглядит следующим образом:
@Configuration
public class MqttConfiguration {
@Value("${mqtt.broker.host}")
private String brokerHost;
@Value("${mqtt.broker.port}")
private String brokerPort;
@Value("${mqtt.broker.topics}")
private String topics;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
String[] parsedTopics = parseTopics();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
"tcp://" + brokerHost + ":" + brokerPort,
UUID.randomUUID().toString(),
parsedTopics);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
private String[] parseTopics() {
return topics.split(",");
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MqttMessageHandler();
}
}
Обработка полученных сообщений представлена классом MqttMessageHandler:
public class MqttMessageHandler implements MessageHandler {
@Autowired
private AlarmRepository alarmRepository;
@Autowired
private DeviceRepository deviceRepository;
private Gson gson = new GsonBuilder().create();
private DateFormat sdf = new SimpleDateFormat("dd.MM.yyyy H:m:s");
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String payload = (String) message.getPayload();
Map<String, String> parsedMessage = (Map<String, String>) gson.fromJson(payload, Map.class);
long occurredAt = 0L;
try {
occurredAt = sdf.parse(parsedMessage.get("occurred_at")).getTime();
} catch (ParseException e) {
e.printStackTrace();
return;
}
UUID deviceID = UUID.fromString(parsedMessage.get("device_id"));
Device device = new Device(deviceID, "", new Date().getTime(), occurredAt);
deviceRepository.saveAndFlush(device);
Alarm alarm = new Alarm(
UUID.fromString(parsedMessage.get("id")),
parsedMessage.get("place"),
parsedMessage.get("filename"),
parsedMessage.get("type"),
device,
occurredAt,
false
);
alarmRepository.saveAndFlush(alarm);
}
}
Добавим в зависимости следующее:
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.0.3</version>
</dependency>
Конфигурация MinIO:
@Configuration
public class MinioConfiguration {
@Value("${minio.host}")
private String host;
@Value("${minio.port}")
private String port;
@Value("${minio.access.key}")
private String accessKey;
@Value("${minio.secret.key}")
private String secretKey;
@Value("${minio.bucket}")
private String bucket;
@Bean
public MinioClient getClient() {
return MinioClient.builder()
.endpoint(host, Integer.parseInt(port), false)
.credentials(accessKey, secretKey)
.build();
}
@Bean
public MinioFileManager getManager(MinioClient client) {
return new MinioFileManager(client);
}
}
Изящно, не так ли?
MinioFileManager — класс собственной реализации, основной целью которого является облегчить жизнь при манипуляциях с файлами.
Пример самой работы с MinIO будет описан чуть ниже — для начала давайте поговорим о некоторых нюансах реализации потокового воспроизведения медиа-контента поверх HTTP протокола.
HTTP video streaming
В нашем случае будем рассматривать передачу именно видео-контента.
Потоковое воспроизведение, в общем случае, представляет собой разбиение некоторого контента на части и непрерывную передачу кусочков данных клиенту. Величина этих самых кусочков указывается клиентом в заголовке запроса Range. К примеру, так: bytes=0-1000000. В свою очередь сервер «вырезает» указанный интервал байт из видео и выдаёт клиенту с кодом HTTP = 203 (Partial content). Помимо этого, важно учесть, что сервер должен быть способен отдать также и всё видео целиком. Существенная разница заключается в том, что код ответа должен быть равен 200. Перечислим необходимые заголовки ответа сервера для корректного отображения видео:
Content-Type. Указывает возвращаемый тип данных. В нашем случае используется video/mp4
Accept-Ranges. Данным заголовком сервер информирует клиента о том, что тот способен выдавать контент ранжировано, а также явно указывает то, в чём желает видеть интервалы разбиения — в нашем случае в байтах: Accept-Ranges: bytes.
Content-Length. Не менее важный заголовок, отвечающий за результирующее количество байт передаваемого медиа-файла. В случае некорректно посчитанной длины, видео может попросту не загрузиться (особенно чувствительны к этому заголовку мобильные браузеры).
Content-Range. Сервер здесь явно указывает интервал байт, который будет передан клиенту, а также информирует о суммарном количестве байт в видео: Content-Range: bytes 1000-15000/250000.
Вернёмся теперь в код. Метод readFile взаимодействует с MinIO клиентом для получения массива байт. Далее на основании отправленного с клиента заголовка Range в методе slice определяется, нужно ли вырезать кусок из видео или же отдать его в том виде, в котором он был помещён в объектное хранилище.
public class MinioFileManager implements FileManager {
@Value("${minio.bucket}")
private String bucket;
private final MinioClient client;
public MinioFileManager(MinioClient mc) {
client = mc;
}
public Video getVideo(String filename, VideoRange range) throws Exception {
byte[] data = readFile(filename);
Video video = new Video(data);
return slice(video, range);
}
private Video slice(Video video, VideoRange range) {
if (range.wholeVideo()) {
return video;
}
int finalSize;
if (video.shorterThan(range.getEnd()) || range.withNoEnd()) {
finalSize = video.getSize() - (int) range.getStart();
} else {
finalSize = (int) range.difference();
}
byte[] result = new byte[finalSize];
System.arraycopy(video.asArray(), (int) range.getStart(), result, 0, result.length);
return new Video(result, false, video.getSize());
}
private byte[] readFile(String filename) throws Exception {
try (InputStream is = client.getObject(
GetObjectArgs.builder()
.bucket(bucket)
.object(filename)
.build())) {
ByteArrayOutputStream bufferedOutputStream = new ByteArrayOutputStream();
byte[] data = new byte[1024];
int nRead;
while ((nRead = is.read(data, 0, data.length)) != -1) {
bufferedOutputStream.write(data, 0, nRead);
}
int resultLength = bufferedOutputStream.size();
bufferedOutputStream.flush();
byte[] result = new byte[resultLength];
System.arraycopy(bufferedOutputStream.toByteArray(), (int) 0, result, 0, result.length);
return result;
}
}
public void removeFile(String filename) {
List<DeleteObject> objects = new LinkedList<>();
objects.add(new DeleteObject(filename));
Iterable<Result<DeleteError>> results =
client.removeObjects(
RemoveObjectsArgs.builder().bucket(bucket).objects(objects).build());
try {
for (Result<DeleteError> result : results) {
DeleteError error = result.get();
System.out.println(
"Error in deleting object " + error.objectName() + "; " + error.message());
}
} catch (Exception e) {
e.printStackTrace();
}
}
Как мы помним, сервер также должен дополнить ответ на запрос рядом заголовков. За это отвечает класс VideoResponseFactory, который генерирует два вида респонса: либо с целым видео-контентом, либо же определенный интервал.
public class VideoResponseFactory {
private final String contentType = "video/mp4";
private final String CONTENT_TYPE = "Content-Type";
private final String ACCEPT_RANGES = "Accept-Ranges";
private final String CONTENT_LENGTH = "Content-length";
private final String CONTENT_RANGE = "Content-Range";
private ResponseEntity<byte[]> toPartialResponse(Video video, String stringRanges) {
long[] ranges = parseRanges(stringRanges);
long start = ranges[0];
long end = ranges[1];
long rangeEnd = end;
if (end == -1) {
rangeEnd = video.originalSize() - 1;
}
return ResponseEntity.status(HttpStatus.PARTIAL_CONTENT)
.header(CONTENT_TYPE, contentType)
.header(ACCEPT_RANGES, "bytes")
.header(CONTENT_LENGTH, String.valueOf(video.getSize()))
.header(CONTENT_RANGE, "bytes" + " " + start + "-" + rangeEnd + "/" + video.originalSize())
.body(video.asArray());
}
private long[] parseRanges(String stringRanges) {
String[] ranges = stringRanges.split("-");
long start = Long.parseLong(ranges[0].substring(6));
long end;
if (ranges.length > 1) {
end = Long.parseLong(ranges[1]);
} else {
end = -1;
}
return new long[] {start, end};
}
public ResponseEntity<byte[]> toResponse(Video video, String ranges) {
if (video.isFull()) {
return toFullResponse(video.asArray());
} else {
return toPartialResponse(video, ranges);
}
}
private ResponseEntity<byte[]> toFullResponse(byte[] video) {
return ResponseEntity.status(HttpStatus.OK)
.header(CONTENT_TYPE, contentType)
.header(CONTENT_LENGTH, String.valueOf(video.length))
.header(ACCEPT_RANGES, "bytes")
.body(video);
}
}
Сам метод контроллера выглядит так:
@GetMapping("/stream/{filename}/{filetype}")
public Mono<ResponseEntity<byte[]>> streamVideo(@RequestHeader(value = "Range", required = false) String httpRangeList,
@PathVariable("filename") String filename,
@PathVariable("filetype") String fileType) throws Exception {
Video video = fm.getVideo(filename, VideoRange.of(httpRangeList));
ResponseEntity<byte[]> response = rf.toResponse(video, httpRangeList);
Optional<Alarm> stored = repository.findAlarmByFilename(filename);
if (stored.isPresent()) {
Alarm alarm = stored.get();
alarm.seen();
repository.saveAndFlush(alarm);
}
return Mono.just(response);
}
Заключение
В течение довольно короткого промежутка времени была реализована полноценная IoT-система, детектирующая нежелательные движения, а также производящая видеофиксацию событий. TODO-лист на будущее выглядит так:
Реализация стройной системы удаленного конфигурирования подключенных в сеть устройств. Среди параметров конфигурирования: WiFi, ключи для MinIO, длительность видеозаписи, адрес и топики для брокера.
Возможность просмотра видео в режиме реального времени.
На этом у меня все.
Stay tuned!