Привет, Хабр. В этой статье я хочу поделиться своим опытом создания приложения на фреймворке Laravel по трансляции видеоконтента. Итак начнём.

Проект опубликован как свободное ПО

Задача

Сделать сервис, совместимый с бизнес-моделью SaaS , принимающий данные по протоколу RTMP от разных поставщиков контента и раздающий этот контент по HLS конечным пользователям за плату или бесплатно, т.е. реализовать Live-трансляции.

Ингредиенты

Будем использовать свободное программное обеспечение. Для работы с RTMP и HLS мы будем использовать nginx с nginx-rtmp-module. Для выполнение веб-приложения мы будем использовать apache2, php, базу данных MariaDB. В качестве фреймворка Laravel с компонентами LiveWire для динамического обновления данных и для построения html страниц шаблоны Blade. Для обработки записанных трансляций будем использовать FFMPEG. Всё это на сервере Ubuntu 20.04 LTS.

Приступаем

Создали проект Laravel 8. Создаём миграции для баз данных. У нас будут пользователи, организации (поставщики контента), посты (то есть записи о будущих, текущих и прошедших трансляциях) и так далее.

Таблица Users:

<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

class CreateUsersTable extends Migration
{
    /**
     * Run the migrations.
     *
     * @return void
     */
    public function up()
    {
        Schema::create('users', function (Blueprint $table) {
            $table->id();
            $table->string('name');
            $table->string('email')->unique();
            $table->unsignedTinyInteger('access_level')->default(0); // 0 - user, 1 - editor, 2 - finmanager, 3 - admin, 4 - root(global admin)
            $table->timestamp('email_verified_at')->nullable();
            $table->string('password')->nullable();
            $table->rememberToken();
            $table->string('google_id')->nullable();
            $table->string('google_token')->nullable();
            $table->string('google_refresh_token')->nullable();
            $table->string('instagram_id')->nullable();
            $table->string('instagram_token')->nullable();
            $table->string('instagram_refresh_token')->nullable();
            $table->string('yandex_id')->nullable();
            $table->string('yandex_token')->nullable();
            $table->string('yandex_refresh_token')->nullable();
            $table->string('vk_id')->nullable();
            $table->string('vk_token')->nullable();
            $table->string('vk_refresh_token')->nullable();
            $table->foreignId('org_id')
                ->nullable()
                ->constrained()
                ->onUpdate('cascade')
                ->onDelete('restrict');
            $table->timestamps();
        });
    }

    /**
     * Reverse the migrations.
     *
     * @return void
     */
    public function down()
    {
        Schema::dropIfExists('users');
    }
}

Таблица Orgs:

<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

class CreateOrgsTable extends Migration
{
    /**
     * Run the migrations.
     *
     * @return void
     */
    public function up()
    {
        Schema::create('orgs', function (Blueprint $table) {
            $table->id();
            $table->string('fulltitle', 512)->nullable();
            $table->string('title', 128);
            $table->string('brandtitle', 128);
            $table->string('ogrn', 15);
            $table->string('inn', 12);
            $table->string('kpp', 9)->nullable();
            $table->string('address', 255);
            $table->string('drawer_status', 2)->nullable();
            $table->string('fintitle', 255);
            $table->string('personal_acc', 20);
            $table->string('bank_name', 128);
            $table->string('bic', 9);
            $table->string('corresp_acc', 20);
            $table->string('kbk', 20)->nullable();
            $table->string('titlekbk', 128)->nullable();
            $table->string('oktmo', 11)->nullable();
            $table->string('purpose', 255)->nullable();
            $table->string('email', 255);
            $table->string('tel', 10);;
            $table->timestamps();
        });
    }

    /**
     * Reverse the migrations.
     *
     * @return void
     */
    public function down()
    {
        Schema::dropIfExists('orgs');
    }
}

Таблица постов:

<?php
...
Schema::create('posts', function (Blueprint $table) {
            $table->id();
            $table->foreignId('org_id')
                ->nullable()
                ->constrained()
                ->cascadeOnUpdate()
                ->nullOnDelete();
            $table->boolean('record')->default(FALSE);
            $table->boolean('autorecord')->default(FALSE);
            $table->boolean('file_preparation')->default(FALSE);
            $table->boolean('rtmp_status')->default(FALSE);
            $table->ipAddress('rtmp_ip_sender')->nullable();
            $table->boolean('allow_comment')->default(FALSE);
            $table->string('title1', 64);
            $table->string('title2', 64)->nullable();
            $table->string('body', 2048)->nullable();
            $table->uuid('stream_name')->unique();
            $table->string('stream_token', 32);
            $table->dateTime('dt_begin');
            $table->dateTime('dt_end');
            $table->unsignedDecimal('price', 14, 2)->nullable();
            $table->unsignedBigInteger('timeleft')->nullable();
            $table->unsignedBigInteger('timepass')->nullable();
            $table->char('color', 4)->charset('binary')->nullable();
            $table->foreignId('picture_id')->nullable()->constrained('mediafiles')->cascadeOnUpdate()->nullOnDelete();
            $table->foreignId('videopreview_id')->nullable()->constrained('mediafiles')->cascadeOnUpdate()->nullOnDelete();
            $table->foreignId('video_id')->nullable()->constrained('mediafiles')->cascadeOnUpdate()->nullOnDelete();
            $table->foreignId('user_id') //author
                ->nullable()
                ->constrained()
                ->cascadeOnUpdate()
                ->nullOnDelete();
            $table->unsignedBigInteger('cv_before')->default(0);
            $table->unsignedBigInteger('cv_live')->default(0);
            $table->unsignedBigInteger('cv_after')->default(0);
            $table->timestamps();
        });

Настраиваем модели Eloquent

Модель пользователя:

<?php

namespace App\Models;

use Illuminate\Contracts\Auth\MustVerifyEmail;
use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Foundation\Auth\User as Authenticatable;
use Illuminate\Notifications\Notifiable;
use Laravel\Sanctum\HasApiTokens;
use Illuminate\Support\Facades\Auth;

class User extends Authenticatable
{
    use HasApiTokens, HasFactory, Notifiable;

    const AAL = [
        0 => 'Пользователь',
        1 => 'Редактор',
        2 => 'Финансовый менеджер',
        3 => 'Администратор',
        4 => 'root'
    ];

    /**
     * The attributes that are mass assignable.
     *
     * @var array<int, string>
     */
    protected $fillable = [
        'name',
        'email',
        'password',
        'google_id',
        'google_token',
        'google_refresh_token',
        'instagram_id',
        'instagram_token',
        'instagram_refresh_token',
        'vk_id',
        'vk_token',
        'vk_refresh_token',
        'yandex_id',
        'yandex_token',
        'yandex_refresh_token',
    ];

    protected $attributes = ['access_level' => 0];

    /**
     * The attributes that should be hidden for serialization.
     *
     * @var array<int, string>
     */
    protected $hidden = [
        'password',
        'remember_token',
        'google_id',
        'google_token',
        'google_refresh_token',
        'instagram_id',
        'instagram_token',
        'instagram_refresh_token',
        'vk_id',
        'vk_token',
        'vk_refresh_token',
        'yandex_id',
        'yandex_token',
        'yandex_refresh_token',
    ];

    /**
     * The attributes that should be cast.
     *
     * @var array<string, string>
     */
    protected $casts = [
        'email_verified_at' => 'datetime',
    ];

    public function getALAttribute()
    {
        return self::AAL[$this->access_level];
    }

    public function org() {
        return $this->belongsTo(Org::class);
    }

    public function scopeLimitAL($query){
        $ac = Auth::user()->access_level;
        if ($ac == 0) {
            return $query->where('id', Auth::id());
        } else {
            return $query->where('access_level', '<=', $ac);
        }
    }
}

Модель поста:

<?php

namespace App\Models;

use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model;
use App\Models\Mediafile;
use App\Models\User;
use App\Models\Org;
use App\Models\Ticket;
use Illuminate\Support\Facades\Auth;
use Illuminate\Support\Str;

class Post extends Model
{
    use HasFactory;

    public function picture() {
        return $this->belongsTo(Mediafile::class);
    }

    public function video() {
        return $this->belongsTo(Mediafile::class);
    }

    public function videopriview() {
        return $this->belongsTo(Mediafile::class);
    }

    public function getStreamStringAttribute() {
        return "{$this->stream_name}/{$this->stream_token}";
    }

    public function getCvAttribute() {
        return $this->cv_before + $this->cv_live + $this->cv_after;
    }

    public function tickets()
    {
        return $this->hasMany(Ticket::class);
    }

    public function user() {
        return $this->belongsTo(User::class);
    }

    public function org() {
        return $this->belongsTo(Org::class);
    }

    protected static function booted()
    {
        static::creating(function (Post $post) {
            $post->user_id = Auth::id();
            $post->org_id = Auth::user()->org_id;
            $post->stream_name = Str::uuid();
            $post->stream_token = Str::random(32);
        });
    }
}

Пишем контроллеры

По сути вся логика приложения пишется в контроллерах. С пользователями и другими моделями достаточно всё тривиально. Рассмотрим контроллер постов и контроллер взаиморасчетов между организациями и пользователями.

Контроллер постов:

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Auth;
use App\Models\Mediafile;
use App\Models\Post;
use App\Jobs\StartRec;
use App\Jobs\StopRec;
use App\Jobs\StopRTMP;

class PostController extends Controller
{

    public function show($id = 0)
    {
        if ($id > 0) {
            return view('post', ['edit' => 0, 'posts' => [Post::findOrFail($id)]]);
        } else {
            return view('post', ['edit' => 0, 'posts' => Post::orderByDesc('id')->paginate(10)]);
        }
    }

    public function index()
    {
        $lp = Post::select('id')->orderByDesc('id')->take(1)->get();
        if (isset($lp[0])) { $lpid = $lp[0]['id']; } else { $lpid = 0; }
        return view('home', [
            'posts' => Post::orderByDesc('id')->paginate(32),
            'postsfuture' => Post::where('dt_begin', '>', now())->orderByDesc('id')->paginate(32),
            'postspast' => Post::where('dt_end', '<', now())->orderByDesc('id')->paginate(32),
            'postsnow' => Post::where('dt_end', '>', now())->where('dt_begin', '<', now())->orderByDesc('id')->paginate(32),
            'lpid' => $lpid
        ]);
    }

    public function edit($id = 0)
    {
        if (($id > 0) && (in_array(Auth::user()->access_level, [1, 3, 4]))) {
            return view('post', ['edit' => 1, 'post' => Post::findOrFail($id)]);
        } elseif (in_array(Auth::user()->access_level, [1, 3, 4])) {
            return view('post', ['edit' => 2]);
        }
    }

    public function rtmp_on(Request $request) {
        $ar = [
            'stream_name' => $request->input('name'),
            'stream_token' => $request->input('token')
        ];
        $post = Post::where($ar)->firstOr(function () { return false; });
        if ($post) {
            $post->rtmp_status = true;
            $post->rtmp_ip_sender = $request->input('addr');
            $post->save();
            if (($post->autorecord == true) && ($post->record == false)) {
                StartRec::dispatch($post);
            }
            return response()->noContent(); // allow
        } else {
            return response(null, 403); // forbidden
        }
    }

    public function rtmp_off(Request $request) {
        $ar = [
            'stream_name' => $request->input('name'),
            'rtmp_ip_sender' => $request->input('addr')
        ];
        $post = Post::where($ar)->firstOr(function () { return false; });
        if ($post) {
            $post->rtmp_status = false;
            $post->save();
            if ($post->record == true) {
                StopRec::dispatch($post);
            }
        }
        return response()->noContent();
    }

    public function rtmp_update(Request $request) {
        $ar = [
            'stream_name' => $request->input('name'),
            'stream_token' => $request->input('token')
        ];
        $post = Post::where($ar)->firstOr(function () { return false; });
        if ($post) {
            $post->rtmp_status = true;
            $post->rtmp_ip_sender = $request->input('addr');
            $post->timepass = $request->input('time');
            $post->save();
            return response()->noContent(); // allow
        } else {
            return response(null, 403); // forbidden
        }
    }
}

Этот контроллер у нас взаимодействует и конечным пользователем и сервером nginx. Маршруты к этому контроллеру для пользователя мы напишем в web.php:

<?php
...
Route::prefix('posts')->middleware('auth')->group(function () {
    Route::get('/{id?}', [PostController::class, 'show'])->where('id', '[0-9]+')->name('posts');
    Route::get('/{id}/edit', [PostController::class, 'edit'])->where('id', '[0-9]+')->name('editpost');
    Route::get('/add', [PostController::class, 'edit'])->name('addpost');
});

А для сервера в файле api.php:

<?php
...
  Route::post('stream/on_publish', [PostController::class, 'rtmp_on'])->name('rtmp_on');
Route::post('stream/on_publish_done', [PostController::class, 'rtmp_off'])->name('rtmp_off');
Route::post('stream/on_update', [PostController::class, 'rtmp_update'])->name('rtmp_update');

Логика такая: пользователь создаёт пост: пишет название, дата и время начала и конца, прикладывает картинку, при сохранение модель создаёт уникальные stream_name и stream_token. stream_name видят все, а stream_token только администраторы и автор поста. Запись внесена в базу данных. Затем автор запускает приложение для трансляции контента на сервер, например OBS. Указывает rtmp адрес сервера и запускает.

Данные принимает сервер nginx и отправляет запрос на приложение, как это указано в его настройках:

rtmp {
    server {
        listen 1935; # Listen on standard RTMP port
        chunk_size 8192;
        max_streams 32;

        application show {
            on_publish "http://live.example.org:80/api/stream/on_publish";
            live on;
            recorder rec1 {
                record all manual;
                record_suffix _rec.flv;
                record_path /var/www/live.example.org/storage/app/public/rec;
                record_unique on;
            }
            hls on;
            hls_path /var/www/live.example.org-hls/public_html/hls;
            hls_fragment 5;
            hls_cleanup on;
            hls_playlist_length 30;
            hls_nested on;
            deny play all;
            on_publish_done "http://live.example.org:80/api/stream/on_publish_done";
            notify_update_timeout 2s;
            on_update "http://live.example.org:80/api/stream/on_update";
        }
    }
}

То есть событие on_publish в сервере nginx вызывает метод rtmp_on у контроллера поста. Веб-приложение проверяет stream_name и stream_token т.е. есть ли вообще такой пост и совпадает ли токен, если да, то ответ для сервера HTTP 204, и сервер продолжает принимать данные RTMP, а если нет, то HTTP 403, сервер отказывает в приёме данных, в программе OBS выйдет ошибка I/O error. rtmp_off - вносит меняет статус поста на "трансляция завершена". rtmp_update - обновляет сведения о трансляции. Также контроллер поста проверяет надо ли записывать трансляцию, причём пользователь может в реальном времени начинать и останавливать запись. Для таких действий мы будем использовать очередь Laravel для того, чтобы они выполнялись в одном потоке. Создадим службу для очередей Laravel:

[Unit]
Description=The Deyen Live Video Platform Laravel Queue Worker Daemon

[Service]
User=www-data
Group=www-data
Restart=on-failure
ExecStart=/usr/bin/php /var/www/live.example.org/artisan queue:work
ExecReload=/usr/bin/php /var/www/live.example.org/artisan queue:restart

[Install]
WantedBy=multi-user.target

для приёма команд от службы очередей laravel на стороне nginx создадим виртуальный хост на 82 порту:

server {
	listen 127.0.1.2:82;
	root /var/www/live.example.org-hls/public_html;
	index index.html index.m3u8;
	server_name live.example.org;

	location / {
		try_files $uri $uri/ =404;
	}
	
	location /control {
        rtmp_control all;
	add_header Access-Control-Allow-Origin "*";
    }
}

Теперь можно вовремя трансляции управлять поведением записи, а также кикать клиентов-вещателей.

Задание на старт записи:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Http;
use App\Models\Mediafile;
use App\Models\Post;

class StartRec implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $post;

    public function __construct(Post $post)
    {
        $this->post = $post;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        if ($this->post->record == false) {
            $this->post->record = true;
            $r = Http::get(env('APP_URL').":82/control/record/start?rec=rec1&app=show&name={$this->post->stream_name}");
            $v = new Mediafile;
            $v->org_id = $this->post->org_id;
            $v->user_id = $this->post->user_id;
            $m = [0 => ""];
            preg_match('/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-[0-9]+_rec.flv/i', $r, $m);
            $v->uri = 'public/rec/'.$m[0];
            $v->sha256checksum = hash('sha256', $v->uri, true);
            $v->save();
            $this->post->video_id = $v->id;
            Post::where('stream_name', $this->post->stream_name)->update(['video_id' => $v->id, 'record' => true]);
        }
    }
}

Теперь про контроллер взаиморасчётов:

<?php

namespace App\Http\Controllers;

use Illuminate\Support\Facades\Auth;
use Illuminate\Http\Request;
use Barryvdh\DomPDF\Facade\Pdf;
use App\Models\Inout;
use App\Models\Org;
use App\Models\User;

class InoutController extends Controller
{

    public function show($id = 0)
    {
        if ($id > 0) {
            return view('inout', ['inouts' => [Inout::limitByUser()->findOrFail($id)]]);
        } else {
            return view('inout', ['inouts' => Inout::limitByUser()->orderByDesc('id')->paginate(10)]);
        }
    }

    public function show_balance()
    {
        return view('inout-balance', ['inouts'=> Inout::getBalances()->limitByUser()->paginate(10)]);
    }

    public function getkvit(Request $request)
    {
        $validatedData = $request->validate([
            'user_id' => ['required', 'numeric'],
            'org_id' => ['required', 'numeric']
        ]);
        $org = Org::findOrFail($validatedData['org_id']);
        $user = User::findOrFail($validatedData['user_id']);
        $qs = ["ST00012", "Name={$org->fintitle}","PersonalAcc={$org->personal_acc}", "BankName={$org->bank_name}", "BIC={$org->bic}", "CorrespAcc={$org->corresp_acc}", "PayeeINN={$org->inn}", "KPP={$org->kpp}", "CBC={$org->kbk}", "OKTMO={$org->oktmo}", "Purpose=ID {$user->id} {$org->purpose}", "DrawerStatus={$org->drawer_status}", "PersAcc={$user->id}"];
        $ms = implode('|',$qs);
        $pdf = PDF::loadView('pdf/kvit', ['org' => $org, 'user' => $user, 'ms' => $ms]);
        return $pdf->download('kvit.pdf');
    }

    public function edit()
    {
        if (in_array(Auth::user()->access_level, [2, 3, 4])) {
            return view('inout-add');
        }
    }

    public function store(Request $request)
    {
        if (in_array(Auth::user()->access_level, [2, 3, 4])) {
            $inout = new Inout;
            $validatedData = $request->validate([
                'title_doc' => ['required', 'string', 'max:64'],
                'number_doc' => ['required', 'string', 'max:64'],
                'date_doc' => ['required', 'date'],
                'user_id' => ['required', 'numeric'],
                'sum' => ['required', 'numeric']
            ]);
            $inout->fill($validatedData);
            $inout->org_id = Auth::user()->org_id;
            $inout->total = $inout->balance;
            $inout->save();
            return redirect()->route('inouts');
        }
    }
}

У нас по каждому пользователю по каждой организации ведется отдельный лицевой счёт взаиморасчётов. Самого лицевого счёта как бы нет, это лишь состояние отношений между организацией и пользователем. Это полезно когда мы с юридической точки зрения не хотим быть платежным агентом и все платежи между клиентами и организациями проходят на прямую. Так же добавим метод оплаты в пользу организации по реквизитам с формированием квитанции в PDF с QR-кодом. За это отвечает функция getkvit. Не составит труда добавить и другие методы оплаты. Не буду вставлять коды Blade шаблонов, иначе статья станет слишком длинной.

Проект опубликован на Github https://github.com/deyen01/dlvp как свободное программное обеспечение.

Комментарии (4)


  1. balbeshka
    20.06.2022 06:18
    +4

    Какая мощность у вашего сервера и сколько клиентов он держит? Какой поток при этом входящий и выходящий?


    1. deyen Автор
      20.06.2022 11:39

      это зависит в первую очередь от nginx. Само приложение не занимается обработкой видеоданных.


      1. Kenya
        20.06.2022 11:59

        Выходит, заголовок немного некорректный. Не приложение для видеотрансляции, а настройка nginx для видеотрансляций


        1. deyen Автор
          20.06.2022 14:19

          считать RTMP и выдать HLS - это делает nginx с помощью nginx-rtmp-module. Речь не о настройке nginx-rtmp-module, а о приложении. Считаю, что заголовок корректен.