В прошлой статье я обещал показать скрипт, которым тяну видео с камеры и, хотя с тех пор прошло все же некоторое количество времени, обещания же нужно выполнять. Вот я и выполняю.

Так уж вышло, что с асинхронностью в мире серверного web ассоциируется все что угодно, но не PHP.

Ну потому что, ну вы знаете, вот эта умирающая модель, утечки памяти, да и вообще в PHP из коробки нет ничего, кроме stream_select() и stream_set_blocking().

Где-то там, на PECL, есть какой-то libuv, который в принципе всего лишь обертка для сишных функций оригинальной библиотеки, поэтому его использование как есть бросает вам некоторые вызовы. Да и вообще, кто в здравом уме будет этим заниматься?

Но если мы перестанем жить в мире PHP4 и немного вернемся в современные реалии, то увидим, что за последние годы дела несколько изменились. У нас появились такие интересные инструменты как ReactPHP и AmPHP, компоненты которых хорошо покрывают функционал Node.js, а наличие генераторов позволяет писать асинхронный код в удобном стиле, подобном async/await, избегая всех вот этих бесконечных коллбеков в коллбеках и километровых цепочек .then().then().then().

Так что поэтому сейчас, как мне кажется, практически нет тех задач из мира Node.js, которые не мог бы решить PHP. Но если таковые еще остались, то все упирется только в наличие каких-то отдельных библиотек, а не отсутствие возможностей как таковых.
It may surprise people to learn that the PHP standard library already has everything we need to write event-driven and non-blocking applications. We only reach the limits of native PHP’s functionality in this area when we ask it to poll thousands of file descriptors for IO activity at the same time. Even in this case, though, the fault is not with PHP but the underlying system select() call which is linear in its performance degradation as load increases.

amphp.org/amp/event-loop

Конечно, вопрос использования этого всего в production еще не закрыт до конца, но если не делать заведомо ошибочных вещей, которых вам не простит и любая другая асинхронная среда исполнения, то все будет хорошо.

DVRIP


Мы будем рассматривать протокол, предположительно придуманный китайцами, который используется для общения ПО c камерами наблюдения.

Работает это все поверх TCP, называется DVRIP (иногда Sofia) и к моменту, когда мне это было нужно, я нашел только две более-менее вменяемых библиотеки, одна из которых описывала вообще только подключение к камере и обмен парой-тройкой сообщений, но имела описание заголовка пакета, которое мне очень помогло.

Вторая была найдена чуть позже, в момент обострения у меня not invented here синдрома, да и работала немного не так как мне хотелось бы.

В общем, вооружившись сниффером и описанием заголовка пакета я кратенько набросал некий скрипт, который в одном виде крутится у меня где-то на сервере, а в виде некоего proof of concept будет представлен в этой статье.

Сам протокол не сложен и пакет выглядит следующим образом:

  • шапка размером 20 байт, которую в PHP можно обозначить как

    unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', $buffer)
  • за шапкой следует сообщение длинной len и может содержать либо json, дополненный двумя байтами \x0a\x00, либо бинарные данные.

Мы будем передавать только json, а вот получать и то и другое.

Ответ (если это пакет с json) обычно содержит поле Ret, указывающее на успешность выполнения нашего запроса. Ответы, содержащие код Ret не равный 100 или 515, мы будем считать заведомо ошибочными.

Вооружившись этими базовыми знаниями давайте подключимся к камере и будем разбираться.
Нам потребуется две библиотеки — amphp/socket, которая подтянет за собой практически все нужное, и evenement/evenement, которые доступны в packagist и не требуют никаких расширений.

Работа с TCP в Amp выглядит примерно так:

Loop::run(function () {
    $socket = yield connect("tcp://{$addr}:{$port}");
    
    yield $socket->write('Hello');
    
    while ($chunk = yield $socket->read()) {
        echo $chunk;
    }
    
    $socket->close();
});

DVRIP обычно работает на порту 34567, поэтому в нашем случае это будет что-то такое:

$socket = yield connect('tcp://192.168.0.200:34567');

Дальше мы логинимся, отправляя камере наш логин и хеш пароля, который вычисляется так:

function sofiaHash(string $password) : string
{
    $md5 = md5($password, true);    
    return implode('', array_map(function ($i) use ($md5) {
        $c = (ord($md5[2 * $i]) + ord($md5[2 * $i + 1])) % 62;
        $c += $c > 9 ? ($c > 35 ? 61 : 55) : 48;
        return chr($c);
    }, range(0, 7)));
}

В случае успешной авторизации мы запрашиваем видео и, если камера может нам его отдать, то запускаем его передачу.

Каждый тип пакета имеет свой msgId, благодаря чему мы можем понимать на какой запрос нам пришел ответ. В общем, не пыльная работёнка.

Для удобства работы давайте для начала опишем класс для нашего пакета:

class Packet
{
    public const SUCCESS_CODES = [100, 515];    
    public const MESSAGE_CODES = [
        'packet.request.login'      => 1000,
        'packet.request.keepAlive'  => 1006,
        'packet.request.claim'      => 1413,
        
        'packet.response.login'     => 1001,
        'packet.response.keepAlive' => 1007,
        'packet.response.claim'     => 1414,
        
        'packet.videoControl'       => 1410,
        'packet.binary'             => 1412,
    ];

    protected int $head = 255;
    protected int $version = 0;
    protected int $sessionId;
    protected int $sequence;
    protected int $msgId;
    
    protected ?string $rawData;
    protected ?array $data;
    
    public function __construct(int $msgId, ?array $data = null, int $sequence = 0, int $sessionId = 0)
    {
        $this->msgId = $msgId;
        $this->data = $data;
        $this->sequence = $sequence;
        $this->sessionId = $sessionId;
    }
    
    public function __toString() : string
    {
        if ($this->data !== null) {
            $this->rawData = json_encode($this->data, JSON_THROW_ON_ERROR) . "\x0a\x00";
        }
        return pack('CCx2IIx2SI', $this->head, $this->version, $this->sessionId, $this->sequence, $this->msgId, strlen($this->rawData))
            . $this->rawData;
    }
    
    public function getData() : ?array
    {
        return $this->data;
    }
    
    public function getRawData() : ?string
    {
        return $this->rawData;
    }
    
    public function getSession() : int
    {
        return $this->sessionId;
    }
    
    public function getSequence() : int
    {
        return $this->sequence;
    }
    
    public function getMessageType() : string
    {
        $types = array_flip(static::MESSAGE_CODES);
        if (!isset($types[$this->msgId])) {
            throw new \Exception('Unknown message type: ' . $this->msgId);
        }
        
        return $types[$this->msgId];
    }
}

и попробуем отправить запрос на авторизацию

yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [
    'EncryptType' => 'MD5',
    'LoginType'   => 'DVRIP-Web',
    'PassWord'    => sofiaHash('admin'),
    'UserName'    => 'admin',
]));

Ага, отлично. Мы подключились и отправили сообщение, но мы даже не знаем, правильное ли и отреагировала ли камера на него хоть как-то. Нам нужно как-то прочитать ответ (давайте сделаем это асинхронно), выделить в нем конкретные пакеты и декодировать их.

Чтоб как-то разделить логику обработки пакетов, приходящих в ответ, я решил воспользоваться библиотекой evenement и набросал на ее основе класс, который будет выбрасывать уже декодированные пакеты

class Reader extends EventEmitter
{
    protected InputStream $socket;
    
    public function __construct(InputStream $socket)
    {
        $this->socket = $socket;
    }
    
    public function start() : \Generator
    {
        $buffer = '';
        while (!$this->socket->isClosed() && $result = yield Packet::read($this->socket, $buffer)) {
            [$packet, $chunk] = $result;
            $buffer = $chunk;
            $chunk = null;
            $this->emit('packet', [$packet]);
            $this->emit($packet->getMessageType(), [$packet]);
        }
        
        $buffer = null;
    }
}

а так же добавил пару статических методов нашему классу Packet, которые будут, собственно говоря, выделять и декодировать пакеты из потока данных

public static function read(InputStream $socket, string $buffer = '') : Promise
{
    return call(function () use ($socket, $buffer) {
        $header = null;
        
        do {
            if ($header === null && strlen($buffer) > 20) {
                $header = unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', substr($buffer, 0, 20));
                $buffer = substr($buffer, 20);
            }
            
            if ($header !== null && strlen($buffer) >= (int) $header['len']) {
                return [
                    static::fromRaw($header, (int) $header['len'] > 0 ? substr($buffer, 0, (int) $header['len']) : null),
                    substr($buffer, (int) $header['len'])
                ];
            }
            
            $buffer .= yield $socket->read();                
        } while (!$socket->isClosed());
    });
}

protected static function fromRaw(array $header, ?string $rawData) : self
{
    $packet = new static($header['msgId'], null, $header['sequence'], $header['sessionId']);
    $packet->rawData = $rawData;
    
    $packet->data = (int) $packet->msgId === static::MESSAGE_CODES['packet.binary'] || $rawData === null
        ? null
        : json_decode(substr($rawData, 0, -2), true, 512, JSON_THROW_ON_ERROR);
    
    return $packet;
}

Теперь, благодаря использованию Evenement , мы можем в описать нашу логику в виде

$reader->on($packetType, $handler);

И вот такая вот цепочка у меня получилась, где я добавил еще Keep Alive и обработку SIGINT / SIGTERM:

$reader = new Reader($socket = yield connect('tcp://10.0.5.100:49152')); 

$reader->on('packet.response.login', asyncCoroutine(function (Packet $packet) use ($socket) {
    if (empty($packet->getData()['Ret']) || !in_array($packet->getData()['Ret'], Packet::SUCCESS_CODES)) {
        throw new \Exception('Wrong login data');
    }
    
    yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.claim'], [
        'Name' => 'OPMonitor',
        'OPMonitor' => ['Action' => 'Claim', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode'  => 'TCP']],
    ], 1, $packet->getSession()));
}));

$reader->once('packet.response.login', function (Packet $packet) use ($socket) {
    Loop::unreference(Loop::repeat($packet->getData()['AliveInterval'] * 1000, function ($watcherId) use ($socket, $packet) {
        if ($socket->getResource() === null) {
            return Loop::cancel($watcherId);   
        }
        
        yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.keepAlive'], [
            'Name' => 'KeepAlive', 'SessionID' => $packet->getData()['SessionID']
        ], 0, $packet->getSession()));
    }));
});

$reader->on('packet.response.claim', asyncCoroutine(function (Packet $packet) use ($socket) {
    yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [
        'Name' => 'OPMonitor',
        'OPMonitor' => ['Action' => 'Start', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode'  => 'TCP']],
    ], $packet->getSequence() + 1, $packet->getSession()));
}));

$emitter = new Emitter();    
$reader->on('packet.binary', function ($packet) use ($emitter) {
    $emitter->emit($packet->getRawData());
});

$reader->once('packet.binary', function (Packet $packet) use ($socket) {
    $signalHandler = function () use ($socket, $packet) {
        yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [
            'Name' => 'OPMonitor',
            'OPMonitor' => ['Action' => 'Stop', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode'  => 'TCP']],
        ], 0, $packet->getSession()));
        
        $socket->close();
    };
    Loop::unreference(Loop::onSignal(defined('SIGINT')  ? SIGINT  : 2,  $signalHandler, 'SIGINT'));
    Loop::unreference(Loop::onSignal(defined('SIGTERM') ? SIGTERM : 15, $signalHandler, 'SIGTERM'));
});

asyncCall([$reader, 'start']);

yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [
    'EncryptType' => 'MD5',
    'LoginType'   => 'DVRIP-Web',
    'PassWord'    => sofiaHash('123qwea'),
    'UserName'    => 'admin',
]));

yield pipe(new IteratorStream($emitter->iterate()), getStdout());

Само собой, что вместо stdout мы можем перенаправить поток видео куда душе угодно.

Итоговый скрипт
<?php

ini_set('display_errors', 'stderr');

use Amp\Emitter;
use Amp\Loop;
use Amp\Promise;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\IteratorStream;
use Evenement\EventEmitter;

use function Amp\call;
use function Amp\asyncCall;
use function Amp\asyncCoroutine;
use function Amp\ByteStream\getStderr;
use function Amp\ByteStream\getStdout;
use function Amp\ByteStream\pipe;
use function Amp\Socket\connect;

require 'vendor/autoload.php';

function sofiaHash(string $password) : string
{
    $md5 = md5($password, true);    
    return implode('', array_map(function ($i) use ($md5) {
        $c = (ord($md5[2 * $i]) + ord($md5[2 * $i + 1])) % 62;
        $c += $c > 9 ? ($c > 35 ? 61 : 55) : 48;
        return chr($c);
    }, range(0, 7)));
}

class Reader extends EventEmitter
{
    protected InputStream $socket;
    
    public function __construct(InputStream $socket)
    {
        $this->socket = $socket;
    }
    
    public function start() : \Generator
    {
        $buffer = '';
        while (!$this->socket->isClosed() && $result = yield Packet::read($this->socket, $buffer)) {
            [$packet, $chunk] = $result;
            $buffer = $chunk;
            $chunk = null;
            $this->emit('packet', [$packet]);
            $this->emit($packet->getMessageType(), [$packet]);
        }
        
        $buffer = null;
    }
}

class Packet
{
    public const SUCCESS_CODES = [100, 515];    
    public const MESSAGE_CODES = [
        'packet.request.login'      => 1000,
        'packet.request.keepAlive'  => 1006,
        'packet.request.claim'      => 1413,
        
        'packet.response.login'     => 1001,
        'packet.response.keepAlive' => 1007,
        'packet.response.claim'     => 1414,
        
        'packet.videoControl'       => 1410,
        'packet.binary'             => 1412,
    ];
    
    public static function read(InputStream $socket, string $buffer = '') : Promise
    {
        return call(function () use ($socket, $buffer) {
            $header = null;
            
            do {
                if ($header === null && strlen($buffer) > 20) {
                    $header = unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', substr($buffer, 0, 20));
                    $buffer = substr($buffer, 20);
                }
                
                if ($header !== null && strlen($buffer) >= (int) $header['len']) {
                    return [
                        static::fromRaw($header, (int) $header['len'] > 0 ? substr($buffer, 0, (int) $header['len']) : null),
                        substr($buffer, (int) $header['len'])
                    ];
                }
                
                $buffer .= yield $socket->read();                
            } while (!$socket->isClosed());
        });
    }
    
    protected static function fromRaw(array $header, ?string $rawData) : self
    {
        $packet = new static($header['msgId'], null, $header['sequence'], $header['sessionId']);
        $packet->rawData = $rawData;
        
        $packet->data = (int) $packet->msgId === static::MESSAGE_CODES['packet.binary'] || $rawData === null
            ? null
            : json_decode(substr($rawData, 0, -2), true, 512, JSON_THROW_ON_ERROR);
        
        return $packet;
    }
    
    protected int $head = 255;
    protected int $version = 0;
    protected int $sessionId;
    protected int $sequence;
    protected int $msgId;
    
    protected ?string $rawData;
    protected ?array $data;
    
    public function __construct(int $msgId, ?array $data = null, int $sequence = 0, int $sessionId = 0)
    {
        $this->msgId = $msgId;
        $this->data = $data;
        $this->sequence = $sequence;
        $this->sessionId = $sessionId;
    }
    
    public function __toString() : string
    {
        if ($this->data !== null) {
            $this->rawData = json_encode($this->data, JSON_THROW_ON_ERROR) . "\x0a\x00";
        }
        return pack('CCx2IIx2SI', $this->head, $this->version, $this->sessionId, $this->sequence, $this->msgId, strlen($this->rawData))
            . $this->rawData;
    }
    
    public function getData() : ?array
    {
        return $this->data;
    }
    
    public function getRawData() : ?string
    {
        return $this->rawData;
    }
    
    public function getSession() : int
    {
        return $this->sessionId;
    }
    
    public function getSequence() : int
    {
        return $this->sequence;
    }
    
    public function getMessageType() : string
    {
        $types = array_flip(static::MESSAGE_CODES);
        if (!isset($types[$this->msgId])) {
            throw new \Exception('Unknown message type: ' . $this->msgId);
        }
        
        return $types[$this->msgId];
    }
}

Loop::run(function () : \Generator {
    $reader = new Reader($socket = yield connect('tcp://10.0.5.100:49152')); 
    
    $reader->on('packet.response.login', asyncCoroutine(function (Packet $packet) use ($socket) {
        if (empty($packet->getData()['Ret']) || !in_array($packet->getData()['Ret'], Packet::SUCCESS_CODES)) {
            throw new \Exception('Wrong login data');
        }
        
        yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.claim'], [
            'Name' => 'OPMonitor',
            'OPMonitor' => ['Action' => 'Claim', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode'  => 'TCP']],
        ], 1, $packet->getSession()));
    }));
    
    $reader->once('packet.response.login', function (Packet $packet) use ($socket) {
        Loop::unreference(Loop::repeat($packet->getData()['AliveInterval'] * 1000, function ($watcherId) use ($socket, $packet) {
            if ($socket->getResource() === null) {
                return Loop::cancel($watcherId);   
            }
            
            yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.keepAlive'], [
                'Name' => 'KeepAlive', 'SessionID' => $packet->getData()['SessionID']
            ], 0, $packet->getSession()));
        }));
    });
    
    $reader->on('packet.response.claim', asyncCoroutine(function (Packet $packet) use ($socket) {
        yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [
            'Name' => 'OPMonitor',
            'OPMonitor' => ['Action' => 'Start', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode'  => 'TCP']],
        ], $packet->getSequence() + 1, $packet->getSession()));
    }));
    
    $emitter = new Emitter();    
    $reader->on('packet.binary', function ($packet) use ($emitter) {
        $emitter->emit($packet->getRawData());
    });
    
    $reader->once('packet.binary', function (Packet $packet) use ($socket) {
        $signalHandler = function () use ($socket, $packet) {
            yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [
                'Name' => 'OPMonitor',
                'OPMonitor' => ['Action' => 'Stop', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode'  => 'TCP']],
            ], 0, $packet->getSession()));
            
            $socket->close();
        };
        Loop::unreference(Loop::onSignal(defined('SIGINT')  ? SIGINT  : 2,  $signalHandler, 'SIGINT'));
        Loop::unreference(Loop::onSignal(defined('SIGTERM') ? SIGTERM : 15, $signalHandler, 'SIGTERM'));
    });
    
    asyncCall([$reader, 'start']);
    
    yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [
        'EncryptType' => 'MD5',
        'LoginType'   => 'DVRIP-Web',
        'PassWord'    => sofiaHash('123qwea'),
        'UserName'    => 'admin',
    ]));
    
    yield pipe(new IteratorStream($emitter->iterate()), getStdout());
});

Теперь можем попробовать записать кусочек видео таким вот образом

$ php recorder.php > output.h264

Мы добавили обработчик SIGINT, так что когда надоест, просто нажмите Ctrl+C и скрипт остановит передачу видео и нормально закроет соединение.

А еще мы можем, например, перенаправить поток в ffmpeg и перекодировать на лету.

$ php recorder.php | ffmpeg -nostdin -y -hide_banner -loglevel verbose -f h264 -i pipe:0 -pix_fmt yuv420p -c copy -movflags +frag_keyframe+empty_moov+default_base_moof -reset_timestamps 1 -f mpegts output.mpegts

А можно городить вообще неистовые цепочки вида

$ php recorder.php | ffmpeg ... | curl -d @- ... 

запуская все это каким-нибудь скриптом, который будет перезапускать каждый час, таким образом мы получим ровные кусочки продолжительностью в 1 час, на лету заливаемые куда нам надо.

Зачем?


Я склонен полагать, что эта статья вызовет у многих вопрос «А зачем?». Есть готовые решения для регистрации с камер, камеры в целом сами много чего умеют, как справедливо возразили в комментариях к прошлой статье, и вообще можно было сделать все куда проще.

Просто у меня было свободное время, которое нужно было куда-то девать, желание сделать по-своему и неуемная тяга к экспериментам.

И это работает.

Конечно, скрипт, представленный в статье, требует некоторых доделываний, и в таком виде пользоваться им не стоит — повторюсь, это proof of concept. И вообще, я вас всячески предостерегаю использовать наколенные самоделки в обеспечении безопасности своего дома.
В моем случае надежность не настолько критична, так что сработает и такой вариант. Над этим скриптом крутится скрипт-вотчер, который передает поток в ffmpeg, записывая куски по часу и заливает их в вк, перезапуская скрипт в случае обрыва связи или каких-то непредвиденностей.

А вообще всего этого можно было бы избежать, просто вытаскивая видео по RTSP over TCP, как я предлагал в конце предыдущей статьи, но ffmpeg любил предательски заглохнуть в случайный момент времени и начать реагировать только на SIGKILL.

Я начал подозревать, что все это потому что родительский скрипт-вотчер запускался кроном и получал пониженный приоритет, но увеличение приоритета лишь снизило частоту появления проблемы.

В случае же с вышеизложенным скриптом все работает куда стабильней и пока нареканий не вызывало.