Привет, Хабр!
Redis Streams давно перестали быть экзотикой для любителей CLI и стали нормальным способом гонять события между сервисами. Но у PHP есть своя специфика: один код — два способа конкурентности. Либо Amp с неблокирующим I/O и семафорами, либо Swoole с корутинами. В обоих случаях хочется одного и того же: устойчивые consumer‑группы, ручной ack, автоматический claim зависших сообщений, backpressure, экспоненциальные ретраи и внятный дед‑леттер.
Что именно строим
Задача: шина событий заказов. Продюсер пишет в orders:events
через XADD с триммингом. Несколько воркеров читают из consumer‑группы orders:cg
командой XREADGROUP в блокирующем режиме, подтверждают обработку через XACK, а «застрявшие» записи переезжают на активного потребителя через XAUTOCLAIM. Если событие стабильно падает — отправляем его в orders:events:dlq
и больше не мучаем основной поток. Мониторим задержку группы по XINFO GROUPS
и периодически чистим хвосты.
Дальше два варианта реализации: Amp и Swoole.
Схема событий и продюсер
События тонкие, поэтому в стрим кладём только факт и ключи для догрузки. Полные JSON‑мешки оставьте хранилищу. Поток обрезаем почти точно, чтобы не расти безлимитно.
<?php
// producer.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// XADD + MAXLEN ~ — почти точная обрезка, быстрее точной. Документация так и советует.
// Поддерживается и XTRIM для периодической чистки.
$id = $redis->xAdd('orders:events', 'MAXLEN', '~', 100_000, '*', [
'type' => 'order.created',
'order_id' => 'o-'.bin2hex(random_bytes(8)),
'user_id' => 'u-'.bin2hex(random_bytes(6)),
'ver' => '1',
]);
echo "added $id\n";
Группа потребителей и инварианты
Создаём группу с позиционированием на хвост стрима, чтобы начать с новых записей. Если стрима ещё нет — MKSTREAM.
<?php
// bootstrap.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
try {
// XGROUP CREATE orders:events orders:cg $ MKSTREAM
$redis->xGroup('CREATE', 'orders:events', 'orders:cg', '$', true);
echo "group created\n";
} catch (RedisException $e) {
if (str_contains($e->getMessage(), 'BUSYGROUP')) {
echo "group exists\n";
} else {
throw;
}
}
Чтение ведём через XREADGROUP с ID >
— это новые, ещё никому не доставленные записи. Результат каждого чтения попадает в Pending Entries List до XACK. Это «ровно‑однажды» не обещает, это «как минимум однажды», а значит обработчики обязаны быть идемпотентными.
Историю смотрим через XPENDING: там есть idle‑время и количество доставок. По delivery‑count решаем, когда отправлять в DLQ.
Вариант 1. Amp: неблокирующее чтение, backpressure семафором
Amp даёт неблокирующий Redis‑клиент и примитивы синхронизации. Мы читаем из группы батчами, ограничиваем параллельную обработку LocalSemaphore
, подтверждаем XACK пачками. Для подбора сирот включаем параллельно цикл XAUTOCLAIM.
<?php
// amp-consumer.php
require __DIR__.'/vendor/autoload.php';
use function Amp\Redis\createRedisClient;
use Amp\Sync\LocalSemaphore;
use Revolt\EventLoop;
$stream = 'orders:events';
$group = 'orders:cg';
$consumer = 'c-'.gethostname().'-'.getmypid();
// параметры backpressure
$concurrency = 16; // не больше N одновременных задач
$batchCount = 64; // XREADGROUP COUNT, сколько брать за раз
$blockMs = 5000; // блокирующее ожидание
$minIdleMs = 60_000; // порог для XAUTOCLAIM
$maxDeliveries = 5; // выше — в DLQ
$redis = createRedisClient('redis://127.0.0.1');
$sem = new LocalSemaphore($concurrency);
// цикл автоклейма сирот
EventLoop::repeat(3.0, function () use ($redis, $stream, $group, $consumer, $minIdleMs, $maxDeliveries) {
$start = '0-0';
while (true) {
// XAUTOCLAIM ... <minIdleMs> <start> COUNT 100
// XAUTOCLAIM увеличивает счётчик попыток, если не JUSTID.
$res = $redis->xautoclaim($stream, $group, $consumer, $minIdleMs, $start, count: 100, justid: false);
$start = $res['next'] ?? '0-0';
$entries = $res['messages'] ?? [];
foreach ($entries as $id => $fields) {
// проверим delivery-count через XPENDING (расширенная форма)
$pend = $redis->xpending($stream, $group, $id, $id, 1);
$deliveryCount = $pend[0][3] ?? 1;
if ($deliveryCount > $maxDeliveries) {
// дед-леттер: переносим и ack
$redis->xAdd('orders:events:dlq', '*', [
'orig_id' => $id,
'reason' => 'max-deliveries',
'payload' => json_encode($fields, JSON_UNESCAPED_UNICODE),
]);
$redis->xAck($stream, $group, [$id]);
}
}
if ($start === '0-0' || empty($entries)) break;
}
});
// основной цикл чтения
while (true) {
// XREADGROUP GROUP <g> <c> COUNT <N> BLOCK <ms> STREAMS <key> >
$batch = $redis->xreadgroup($group, $consumer, [$stream => '>'], $batchCount, $blockMs);
if (!$batch || !isset($batch[$stream])) {
continue;
}
foreach ($batch[$stream] as $id => $fields) {
$lock = $sem->acquire();
Amp\async(function () use ($redis, $stream, $group, $id, $fields, $lock) {
try {
// идемпотентный handler
processOrderEvent($fields); // ваша доменная логика
$redis->xAck($stream, $group, [$id]); // подтверждаем только после успеха
} catch (\Throwable $e) {
// Не ack — оставляем в PEL для повторной доставки/claim
} finally {
$lock->release();
}
});
}
}
function processOrderEvent(array $fields): void {
// Эмулируем «безопасный» парсинг
$type = $fields['type'] ?? '';
$orderId = $fields['order_id'] ?? '';
if ($type === 'order.created' && $orderId !== '') {
// ... проводим запись в БД, операции должны быть идемпотентны по order_id
}
}
XREADGROUP блокирует ожидание новых записей и кладёт доставленное в PEL до XACK. XPENDING в расширенной форме возвращает массив с idle и количеством доставок, что и используем для дед‑леттера. XAUTOCLAIM перевешивает владение сообщением и увеличивает счётчик доставок, что позволяет накапливать статистику ретраев. Семафор Amp ограничивает число параллельно работающих задач и тем самым создаёт backpressure на уровне приложения.
В актуальной версии amphp/redis
команды стримов доступны как методы в нижнем регистре. Если версия не знает конкретный метод, отсылайте сырые команды через универсальный вызов клиента.
Вариант 2. Swoole: корутины, канал как семафор, Redis из ext-phpredis
OpenSwoole умеет перевращать многие I/O‑библиотеки в корутинные, в том числе Redis, а канал даёт простой способ ограничить параллелизм. Этот путь часто выбирают, если уже используете Swoole или у вас воркеры с долгими CPU‑участками.
<?php
// swoole-consumer.php
OpenSwoole\Runtime::enableCoroutine(); // хук для php_stream, в т.ч. Redis
$stream = 'orders:events';
$group = 'orders:cg';
$consumer = 'c-'.gethostname().'-'.getmypid();
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// лимит конкурентной обработки
$parallel = 32;
$chan = new Swoole\Coroutine\Channel($parallel);
// периодический автоклейм
Swoole\Coroutine::create(function () use ($redis, $stream, $group, $consumer) {
$minIdleMs = 60_000;
while (true) {
$start = '0-0';
do {
$res = $redis->xAutoClaim($stream, $group, $consumer, $minIdleMs, $start, 100);
$start = $res[0] ?? '0-0';
$msgs = $res[1] ?? [];
foreach ($msgs as $m) {
$id = $m[0];
$pend = $redis->xPending($stream, $group, $id, $id, 1);
$deliveries = $pend[0][3] ?? 1;
if ($deliveries > 5) {
$redis->xAdd('orders:events:dlq', '*', [
'orig_id' => $id,
'reason' => 'max-deliveries',
'payload' => json_encode($m[1], JSON_UNESCAPED_UNICODE),
]);
$redis->xAck($stream, $group, [$id]);
}
}
} while (!empty($msgs) && $start !== '0-0');
Swoole\Coroutine::sleep(3);
}
});
while (true) {
// блокирующий XREADGROUP
$batch = $redis->xReadGroup($group, $consumer, [$stream => '>'], 64, 5000);
if (!isset($batch[$stream])) continue;
foreach ($batch[$stream] as $id => $fields) {
$chan->push(1); // бронируем слот
Swoole\Coroutine::create(function () use ($redis, $chan, $stream, $group, $id, $fields) {
try {
processOrderEvent($fields);
$redis->xAck($stream, $group, [$id]);
} finally {
$chan->pop(); // освобождаем слот
}
});
}
}
function processOrderEvent(array $fields): void {
// доменная логика, та же, что и в Amp-варианте
}
ext‑phpredis даёт нативные методы xReadGroup
, xAck
, xAutoClaim
, xPending
, xGroup
. Включаем корутины рантаймом OpenSwoole, ограничиваем параллелизм размером канала и получаем предсказуемую нагрузку на БД и внешние API.
Экспоненциальные ретраи без таймерных колес
Streams не содержат отложенных сообщений, поэтому для backoff есть рабочих паттерна.
«Пассивный» backoff на PEL: не ack, XAUTOCLAIM подберёт запись по
min-idle-ms
. Грубовато, но просто. Счётчик попыток виден через XPENDING, XCLAIM/XAUTOCLAIM увеличивают его автоматически.«Активный» delay‑queue: при ошибке вы ack‑аете задачу и кладёте её в ZSET
orders:retry
со score равнымnow + backoffMs
. Отдельная корутина/фибра периодически делаетZRANGEBYSCORE <= now LIMIT 100
, вынимает задачи и пере‑XADD«ит в основной стрим сattempt=N+1
.
Пример простого delay‑пуша:
function scheduleRetry(Redis $r, string $stream, array $payload, int $attempt): void {
$base = 1000; // 1s
$max = 60_000; // 60s
$jitter = random_int(0, 250);
$delay = min($base * (2 ** ($attempt - 1)) + $jitter, $max);
$runAt = (int)(microtime(true) * 1000) + $delay;
$r->zAdd('orders:retry', $runAt, json_encode([
'stream' => $stream,
'payload' => $payload,
'attempt' => $attempt,
], JSON_UNESCAPED_UNICODE));
}
Отдельный воркер:
function retryPump(Redis $r): void {
while (true) {
$now = (int)(microtime(true) * 1000);
$items = $r->zRangeByScore('orders:retry', 0, $now, ['limit' => [0, 100]]);
if (!$items) { usleep(200_000); continue; }
foreach ($items as $raw) {
$r->multi();
$r->zRem('orders:retry', $raw);
$job = json_decode($raw, true, 512, JSON_THROW_ON_ERROR);
$r->xAdd($job['stream'], '*', [
...$job['payload'],
'attempt' => (string)$job['attempt'],
]);
$r->exec();
}
}
}
Redis нативно не умеет отложенные задачи, рекомендую для этого ZSET с временным score.
Мониторинг
Для принятия решений по масштабированию нужна метрика lag. Возьмём XINFO GROUPS
: там есть lag
— количество записей, которые ещё не доставлены группе. В некоторых ситуациях она может быть недоступна и вернётся nil
, это нормально и задокументировано. Сохранять тренд по lag и по PEL имеет смысл отдельным метриком.
Мини‑функции:
function groupLag(Redis $r, string $stream, string $group): ?int {
$groups = $r->xInfo('GROUPS', $stream);
foreach ($groups as $g) {
if (($g['name'] ?? '') === $group) {
return $g['lag'] ?? null; // может быть null
}
}
return null;
}
function pendingSummary(Redis $r, string $stream, string $group): array {
// XPENDING summary: [minId, maxId, total, perConsumer...]
return $r->xPending($stream, $group);
}
Скейлим количество воркеров, если лаг стабильно растёт на горизонте N минут. Если lag «пилит» вокруг нуля, можно уменьшить число воркеров.
Тест перегрузки и восстановление
Быстрый сценарий: продюсер кладёт 50k событий за короткое время, потребитель ограничен семафором/каналом на 16–32 слота, обработка каждой записи занимает условно 50–100 мс.
1. На старте лаг пойдёт вверх — это нормально.
2. При устойчивом потреблении и отсутствии ошибок лаг будет сходиться к нулю.
3. Если остановить половину воркеров, лаг вырастет; при возврате воркеров — сойдёт.
4. Если включить искусственные ошибки, увидите рост delivery‑count в XPENDING и набивающийся DLQ.
Адекватная эксплуатация
Идемпотентность. Обрабатываем запись повторно без побочек. Минимальный гард — ставим идемпотентный ключ в Redis до начала сайд‑эффектов и снимаем его по TTL. Шаблон через SET key NX EX
работает быстро и атомарно в рамках одной ноды. Ключ строим по бизнес‑идентификатору, не по stream‑ID. Пример:
// идемпотентность по паре {type, order_id}
$key = sprintf('idem:%s:%s', $fields['type'] ?? '', $fields['order_id'] ?? '');
$ok = $redis->set($key, '1', ['nx', 'ex' => 3600]); // 1 час хватает большинству обработок
if ($ok !== true) {
return; // уже обрабатывали, выходим тихо
}
// дальше вызываем внешние API/DB и только затем делаем XACK
Если хотите «жёсткий» дедуп, можно вместо TTL хранить маркер навсегда, но это уже эксплуатационный компромисс.
Размер записей. В стрим кладём короткие факты и идентификаторы. Для роста ограничиваем ключ через MAXLEN
прямо на XADD
, включаем «почти точную» обрезку ~
. Это убирает лишние аллокации и держит ключ компактным. Если нужно чистить по времени, периодически гоняем XTRIM MINID
с порогом.
Таймауты. XREADGROUP
используем с BLOCK
в пределах секунд, не минут. Это нормальный режим и он штатно поддержан. На клиенте ставим разумные readTimeout
и connectTimeout
, иначе зависший сетевой хоп повисит воркер. В phpredis это опции конструктора или setOption
. Пример:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379, 2.5, null, 2.5); // connect/read timeouts ~2.5s
$redis->setOption(Redis::OPT_READ_TIMEOUT, 5.0);
Если поставите NOACK
, надёжности не будет, это осознанный выбор.
Тримминг. Держим размер ключа под контролем двумя способами. На входе — XADD ... MAXLEN ~ <cap>
для дешёвой обрезки. Периодически — XTRIM MAXLEN
или XTRIM MINID
, если нужен порог по ID. Для больших потоков это мастхев.
Диагностика. XPENDING
в «summary» показывает общий объём PEL и min/max ID, а «расширенная» форма по диапазону отдаёт записи вместе с idle и delivery‑count. На этом и строится принятие решений по ретраям и DLQ. Для масштабирования следим за lag
из XINFO GROUPS
— это разница между «всего добавлено» и «логически прочитано группой».
Claim. Предпочтительно XAUTOCLAIM
с min-idle-ms
: он совмещает XPENDING + XCLAIM
и работает скан‑подобно, без ручной пагинации по диапазонам. Плюс гораздо меньше кода на вашей стороне. Помним ограничение XCLAIM
: записей, которых уже нет в PEL или которые вычистили из стрима, он не перетянет.
Порядок действий. Без фанатизма, но строго: проверили идемпотентность, сделали внешние операции, зафиксировали их, потом XACK
. Иначе поймаете «подтвердили, потом упали». Э
Акуратное выключение. На SIGTERM перестаём вызывать XREADGROUP
, дожидаемся текущих хендлеров и отправляем XACK
тем, кто завершился. Остальное останется в PEL и будет подобрано XAUTOCLAIM
по таймауту простоя.
Настройка лаг‑алертов. Если lag
устойчиво растёт — добавляем консьюмеров. Если «пилит» около нуля — можно убирать.
Итоги
В конечном счёте у нас в руках инструмент для устойчивых потоков событий в PHP. Все это применимо везде, где много коротких фактов и побочных эффектов: шина заказов и статусов, фулфилмент/логистика, биллинг и антифрод, рассылки и пуш‑уведомления, приём и обработка вебхуков, интеграции с внешними API с ретраями и джиттером, CRM/ERP‑синхронизации, ETL‑помпы/дедуп,телеметрия и алерты.
Делитесь в комментариях своими кейсами.
Если вы работаете с PHP и сталкиваетесь с задачами, которые выходят за рамки простых веб‑страниц, обратите внимание на курс PHP Developer. Professional. В программе — современные подходы к конкурентности, работа с асинхронными инструментами, устойчивые архитектуры и интеграции с промышленными сервисами. Пройдите бесплатное тестирование по курсу, чтобы узнать, подойдет ли вам курс.
А тем, кто настроен на серьезное системное обучение, рекомендуем рассмотреть Подписку — выбираете курсы под свои задачи, экономите на обучении, получаете профессиональный рост. Узнать подробнее
SukhovPro
Перед курсами PHP, пойду на курсы переводчиков...
" устойчивые consumer-группы, ручной ack, автоматический claim зависших сообщений, backpressure, экспоненциальные ретраи и внятный дед-леттер. "