В PHP 8.1 Fibers появился как новая функция языка. Я часто вижу, что многие люди называют fibers (волокна) threads (потоками), что неверно и, я думаю, дает некоторым представление о том, что PHP теперь каким-то образом стал многопоточным. Возможно люди имеют неправильное представление о том, что такое fibers (и, возможно, что такое асинхронное программирование), и я надеюсь, что смогу помочь исправить ситуацию, предоставив практический пример того, как их можно использовать.

Что такое Fibers?

Хотя fibers не являются threads, они могут помочь сделать ваш код более эффективным при выполнении нескольких задач быстрее, что на первый взгляд может показаться многопоточностью (думаю, отсюда и путаница). Fibers позволяют вашему коду тратить меньше времени на ожидание внешних ресурсов, позволяя вам параллельно инициировать несколько запросов, а затем ждать их завершения (в любом порядке).

Чтобы воспользоваться преимуществами fibers, ваша ситуация должна соответствовать нескольким условиям:

  1. Вы имеете дело с чем-то внешним по отношению к PHP.

  2. Ваш ресурс должен быть внешним по отношению к PHP, чтобы его можно было обрабатывать параллельно с вашим кодом PHP. PHP по-прежнему является однопоточным, поэтому пока выполняется конкретный fiber, больше ничего в вашем скрипте выполняться не будет. Это означает, что фактическую работу необходимо перенести в отдельный процесс. Обычными сценариями, отвечающими этому требованию, являются сетевые запросы или подпроцессы.

Вам необходимо иметь возможность запрашивать внешний ресурс таким образом, чтобы не блокировать выполнение вашего сценария.

Например, сложный запрос к базе данных не соответствует этому требованию, поскольку нет возможности продолжить работу сценария во время выполнения запроса и собрать результаты позже (используя стандартные расширения php).

Пример

В этом примере мы рассмотрим сценарий, который перебирает каталог видеофайлов и создает 30-секундный клип для каждого файла с помощью FFMpeg. Сначала мы посмотрим, как типичный синхронный код справится с поставленной задачей, а затем постепенно перенесем этот код в асинхронный формат с помощью fibers.

Синхронный путь

Перебираем каталог и для каждого файла и выполняем exec процесса ffmpeg для преобразования.

<?php

$start = microtime(true);
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $source = $item->getPathname();
        $destination = getTempDestination();
        $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);
        exec($cmd, $output, $ret);
        if ($ret !== 0){
            throw new \RuntimeException('Failed to create clip.');
        }

        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}
$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

Запустив это в папке с 19 эпизодами сериала «Scrubs», я могу обработать все видео за 243,1 секунды. Это занимает некоторое время, потому что в конечном итоге я использую только около 50% общей мощности моего процессора (благодаря -threads 1, добавленному для демонстрационных целей).

Я мог бы ускорить этот процесс, если бы мог запускать два или три экземпляра параллельно. Для того, чтобы код сделал это, потребуется сначала перейти на неблокирующий способ запуска ffmpeg, а затем реализовать fiber, используя этот новый неблокирующий exec в качестве исполняемой функции fiber.

Создание неблокирующего exec

Чтобы правильно использовать fibers, нам нужно начать с неблокирующего exec. Это код, который запускает наш процесс ffmpeg, но продолжает работать, пока процесс ffmpeg выполняет свою работу, а не ждет результата. Мы можем добиться этого, заменив простой вызов функции exec на proc_open. Эта функция позволяют процессу запуститься, пока наш код продолжает работать. Затем в цикле, опрашивая статус другого процесса, чтобы узнать, завершен он или нет, и вернется только тогда, когда ffmpeg выполнит свою задачу.

<?php

$start = microtime(true);
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $source = $item->getPathname();
        $destination = getTempDestination();
        createVideoClip($ffmpeg, $source, $destination);

        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}
$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

function createVideoClip(string $ffmpeg, string $source, string $destination) : array{
    $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);

    $stdout = fopen('php://temporary', 'w+');
    $stderr = fopen('php://temporary', 'w+');
    $streams = [
        0 => ['pipe', 'r']
        , 1 => $stdout
        , 2 => $stderr
    ];

    $proc = proc_open($cmd, $streams, $pipes);
    if (!$proc){
        throw new RuntimeException('Unable to launch download process');
    }

    do {
        usleep(1000); //Wait 1ms before checking
        $status = proc_get_status($proc);
    } while ($status['running']);

    proc_close($proc);
    fclose($stdout);
    fclose($stderr);
    $success = $status['exitcode'] === 0;
    if ($success){
        return [$source, $destination];
    } else {
        throw new \RuntimeException('Unable to perform conversion');
    }
}

С точки зрения кода итерации каталога эта функция блокируется так же, как и exec. Разница в том, что теперь мы сами выполняем блокировку, используя цикл опроса, который мы сможем изменить позже при реализации fibers.

Представляем Fiber

Теперь, когда у нас есть неблокирующая функция создания клипов, мы можем изменить итерацию каталога, чтобы создать новый fiber для каждого файла, который мы хотим преобразовать, используя эту функцию в качестве вызываемой.

<?php

$fiberList = [];
$start = microtime(true);

foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        [$source, $destination] = $fiber->getReturn();
        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}

Итак, теперь мы используем fibers, верно? Код должен быть быстрее, верно? Нет еще нет. Конечно, мы «используем» Fiber для запуска нашей функции создания видеоклипов, но этот код по-прежнему на 100% синхронен и будет обрабатывать только одно видео за раз, так же медленно, как и старый код exec.

Делаем это асинхронно

Чтобы воспользоваться преимуществами fibers, нам нужно сделать код асинхронным и добавить Fiber::suspend в нашу функцию createVideoClip. Это секретный соус, который позволяет PHP продолжать работу по созданию клипов для других файлов, пока ffmpeg обрабатывает текущий файл. В нашей функции подходящим местом для размещения этого вызова приостановки является цикл опроса, заменяющий существующий вызов usleep.

do {
    Fiber::suspend();
    $status = proc_get_status($proc);
} while ($status['running']);

Теперь, когда fiber будет приостановлен (suspend), нам также необходимо возобновить его в какой-то момент, чтобы фактически получить результат создания видеоклипа, иначе оно просто никогда не завершится. Для этого нам нужно собрать все fibers, которые мы создаем при переборе каталога

<?php

$fiberList=[];
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
    }
}

затем пройдитесь по fibers, возобновляя каждое из них по очереди, пока все они не закончатся и мы не сможем получить результат.

<?php

while ($fiberList){
    foreach ($fiberList as $idx => $fiber){
        if ($fiber->isTerminated()){
            [$source, $destination] = $fiber->getReturn();
            echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            unset($fiberList[$idx]);
        } else {
            $fiber->resume();
        }
    }
}

Теперь наш код наконец-то стал асинхронным, и все задания по созданию видеоклипов будут выполняться параллельно. Если вы проследите за своими системными процессами после запуска сценария, вы обнаружите, что для каждого видеофайла немедленно запускается отдельный процесс ffmpeg, и все они начинают работать одновременно. Для моей папки Scrubs это 19 процессов ffmpeg, запущенных одновременно, а общее время выполнения сократилось до 173 секунд.

Слишком много параллелизма!

Иметь 19 одновременных процессов конвертации видео ffmpeg — это слишком много для моего 8-ядерного процессора. Конечно, это быстрее, но теперь мы теряем время, пытаясь бежать слишком много. ЦП будет тратить много времени на переключение контекста между процессами, а не на создание видеоклипов. Вероятно, все было бы еще лучше, если бы мы могли выполнять только несколько преобразований одновременно, а не все.

Добавить ограничение параллелизма возможно, но перед этим нам нужно немного обобщить код, чтобы сделать его более понятным и пригодным для повторного использования. Имеющийся у нас цикл, обрабатывающий fibers до их завершения, необходимо обобщить в небольшую функцию, которую мы можем вызвать и которая будет ждать завершения fibers.

<?php

/**
 * @param Fiber[] $fiberList
 * @param int|null $completionCount
 *
 * @return Fiber[]
 */
function waitForFibers(array &$fiberList, ?int $completionCount = null) : array{
    $completedFibers = [];
    $completionCount ??= count($fiberList);
    while (count($fiberList) && count($completedFibers) < $completionCount){
        usleep(1000);
        foreach ($fiberList as $idx => $fiber){
            if ($fiber->isSuspended()){
                $fiber->resume();
            } else if ($fiber->isTerminated()){
                $completedFibers[] = $fiber;
                unset($fiberList[$idx]);
            }
        }
    }

    return $completedFibers;
}

Вот простая функция, которая принимает список fibers по ссылке и, при необходимости, подсчитывает, сколько fibers следует ожидать. Любое из завершившихся fibers будет удалено из списка и возвращено из функции для обработки вызывающим кодом. По умолчанию функция будет ждать завершения всех fibers в списке, однако добавление дополнительного счетчика позволит ей вернуться после завершения как минимум такого количества fibers (может быть больше, чем запрошено).

Создав эту функцию ожидания, мы можем обновить наш код, добавив ограничение параллелизма. Таким образом, мы можем тратить меньше времени на переключение контекста и больше времени на создание клипов, что, надеемся, еще больше ускорит процесс.

<?php

$concurrency = 3;
$fiberList = [];
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
        if (count($fiberList) >= $concurrency){
            foreach (waitForFibers($fiberList, 1) as $fiber){
                [$source, $destination] = $fiber->getReturn();
                echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            }
        }
    }
}

foreach (waitForFibers($fiberList) as $fiber){
    [$source, $destination] = $fiber->getReturn();
    echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
}

Отлично, теперь с ограничением параллелизма, равным 3, мой процессор не так утомлен переключением контекста и может выполнить все 19 эпизодов Scrubs всего за 143,7 секунды. Намного лучше, чем первоначальные 243,1 секунды, которые потребовались для синхронного выполнения.

Полный пример

<?php

$ffmpeg = getenv('FFMPEG_BIN') ?: 'ffmpeg';
$concurrency = $argv[1] ?? 3;
$fiberList = [];
$start = microtime(true);

foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
        if (count($fiberList) >= $concurrency){
            foreach (waitForFibers($fiberList, 1) as $fiber){
                [$source, $destination] = $fiber->getReturn();
                echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            }
        }
    }
}

foreach (waitForFibers($fiberList) as $fiber){
    [$source, $destination] = $fiber->getReturn();
    echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
}

$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

/**
 * @param Fiber[] $fiberList
 * @param int|null $completionCount
 *
 * @return Fiber[]
 */
function waitForFibers(array &$fiberList, ?int $completionCount = null) : array{
    $completedFibers = [];
    $completionCount ??= count($fiberList);
    while (count($fiberList) && count($completedFibers) < $completionCount){
        usleep(1000);
        foreach ($fiberList as $idx => $fiber){
            if ($fiber->isSuspended()){
                $fiber->resume();
            } else if ($fiber->isTerminated()){
                $completedFibers[] = $fiber;
                unset($fiberList[$idx]);
            }
        }
    }

    return $completedFibers;
}

function getTempDestination() : string{
    $destination = tempnam(sys_get_temp_dir(), 'video');
    unlink($destination);
    $dir = dirname($destination);
    $file = basename($destination, '.tmp');

    return $dir . DIRECTORY_SEPARATOR . $file . '.mp4';
}

function createVideoClip(string $ffmpeg, string $source, string $destination) : array{
    $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);

    $stdout = fopen('php://temporary', 'w+');
    $stderr = fopen('php://temporary', 'w+');
    $streams = [
        0 => ['pipe', 'r']
        , 1 => $stdout
        , 2 => $stderr
    ];

    $proc = proc_open($cmd, $streams, $pipes);
    if (!$proc){
        throw new RuntimeException('Unable to launch download process');
    }

    do {
        Fiber::suspend();
        $status = proc_get_status($proc);
    } while ($status['running']);

    proc_close($proc);
    fclose($stdout);
    fclose($stderr);
    $success = $status['exitcode'] === 0;
    if ($success){
        return [$source, $destination];
    } else {
        throw new \RuntimeException('Unable to perform conversion');
    }
}

Комментарии (13)


  1. impwx
    24.08.2023 09:52
    +3

    Неужели в стандартной библиотеке нет готовой реализации функции waitForFibers и ее каждый должен писать сам?


    1. bel1k0v Автор
      24.08.2023 09:52
      -1

      В стандартной нет. А где-то есть в других языках реализация?


      1. impwx
        24.08.2023 09:52
        +1

        Ну конечно. На одном fire and forget далеко не уедешь, нужны какие-то механизмы синхронизации. В JS есть Promise.all / any, в C# - Task.WhenAll / WhenAny, в Java - CompletableFuture.allOf / anyOf, про другие языки навскидку не скажу но можно легко нагуглить при необходимости


        1. bel1k0v Автор
          24.08.2023 09:52
          +2

          Промисы и в php есть, но не в стандартной комплектации, см. ReactPHP. С помощью fiber вы возвращаете то значение, которое вам нужно, а не обёртку, как в случае с js, плюс вы можете прервать этот вызов в любой момент времени. Этот подход немного отличается от тех примеров, что вы даёте. См. определение: https://www.php.net/manual/en/language.fibers.php


        1. NiceDay
          24.08.2023 09:52
          +2

          в стандартной библиотеке нет, но, к счастью, в php подход к использованию библиотек чудесно стандартизирован и ни у кого не вызывает трудностей.

          вот вам all, any
          https://github.com/reactphp/promise/blob/3.x/src/functions.php

          а вот даже awaitAnyN, и как раз для футур, основанных на файберах -
          https://github.com/amphp/amp/blob/3.x/src/Future/functions.php


          1. impwx
            24.08.2023 09:52

            Со сторонними библиотеками понятно. Меня просто удивил контраст: когда я последний раз активно писал на PHP, в стандартной библиотеке можно было найти вообще всё что угодно, хоть чёрта лысого.


            1. Rukis
              24.08.2023 09:52

              Хм, не знал что Тейлор в доке PHP засветился


  1. ubx7b8
    24.08.2023 09:52
    +1

    Что-то всё это выглядит совсем не проще, чем использование pcntl_fork и pcntl_waitpid


    1. NiceDay
      24.08.2023 09:52
      +2

      потому что файберы не предполаются для использования обычным пользователем руками.
      но это отличный кирпичик, на котором можно построить библиотеки, работающие при необходимости и в блокирующем режиме, и в асинхронном. например, - https://github.com/amphp/amp/blob/3.x/examples/async.php

      если мы вернемся к вопросу управления процессами, то вот так можно синхронно запустить один пооцесс -https://github.com/amphp/process/blob/2.x/examples/basic-command.php
      вот так несколько одновременно - https://github.com/amphp/process/blob/2.x/examples/ping-many.php

      а вот так решить проблему, которую решает автор статьи - https://github.com/amphp/process/blob/2.x/examples/ffmpeg.php


  1. TyVik
    24.08.2023 09:52

    Давно не пишу на php, а разве там нет thread-pool? Собираете список тасок, которые надо выполнить и кидаете их туда. Масштабируете по количеству ядер. Для python это будет ProcessPoolExecutor.


    1. bel1k0v Автор
      24.08.2023 09:52

      1. TyVik
        24.08.2023 09:52

        Вот хотелось бы сравнения по скорости с этой штукой. Если она умеет в 3 потока обрабатывать 19 задач, то мне кажется, выигрыша в производительности и не будет. Всё равно упрёмся в математику.

        Ну и после вот такого примера https://www.php.net/manual/en/language.fibers.php#127282 всё равно не понятно зачем вручную использовать управление задачами, если его можно не использовать? Чтобы уместить всё в один поток? Тогда не понятно зачем программисту об этом думать - можно ж в рантайм эту проблему свалить.


        1. bel1k0v Автор
          24.08.2023 09:52

          В принципе никто не мешает вам сравнить - лично мне, лень. Остальные вопросы не понял.