Привет, хабровчане! На связи Алиса — тимлид в e-commerce агентстве KISLOROD. Мы ежедневно имеем дело с большими каталогами, сложной коммерцией и 1С, которая дышит в затылок. И однажды мы решили сделать невозможное: подружить Битрикс с Laravel... 

В первой части мы доказали, что Laravel и Битрикс могут жить вместе, как кот и пылесос — с уважением к личному пространству. Во второй — выносим бизнес-логику, не ломая 1С. Рассказываю, как устроить единый вход без шаринга сессий, ускорить каталог с OpenSearch, внедрить outbox-публикации и навести порядок в наблюдаемости. 

Шаг А. База: один вход, два бэкенда

Пользователю все равно, сколько у вас бэкендов и какой там API Gateway. Он просто хочет залогиниться один раз — и спокойно работать. Без дополнительных форм, без двойной авторизации, без подвисаний между фронтом и админкой. Идея простая: вход единый, контур общий, работа — бесшовная.

Чтобы это работало, мы делаем Laravel центром аутентификации. Он выпускает токен по JWT с подписью RS256 и кладет его в защищенную HttpOnly-cookie btjwt на общий домен .example.ru. После этого весь остальной бэкенд просто проверяет подпись токена по публичному ключу, доступному на /.well-known/jwks.json. Нет необходимости делиться секретами между сервисами — у каждого есть ключ, Laravel остается единственным источником доверия.

Такой подход дает устойчивость к сбоям: если токен невалиден — Laravel просто отклоняет запрос. Если cookie пропала — Битрикс продолжает работать с обычными сессиями. Даже если JWKS временно недоступен — используем кэш или отпускаем пользователя как анонима. При этом вся логика обернута аккуратно: ничего не ломается, UX не портится.

Предвкушая ехидные комментарии, сразу отвечу, зачем RS256, а не HMAC. Потому что секрет остается у Laravel, а проверка работает по открытому ключу. Безопаснее, проще и не требует лишних договоренностей.

Контракты простые, прозрачные и пригодные для автогенерации клиентского SDK:

Метод

URL

Назначение

POST

/auth/login

Логин пользователя, выдача btjwt

POST

/auth/logout

Отзыв токена и завершение сессии

GET

/auth/me

Проверка текущего пользователя

GET

/.well-known/jwks.json

Публичные ключи для проверки JWT

С маршрутами разобрались — тут все лаконично. Но чтобы система не обернулась дырой в безопасности или лотереей с куками, важно держать в голове базовые параметры и договоренности. Эта таблица с тем, что должно быть задано и проверено:

Параметр

Значение

Cookie

btjwt, HttpOnly, Secure, SameSite=Lax, TTL 10–15 минут, домен .example.ru

Алгоритм

RS256, с ротацией ключей по kid, несколько ключей одновременно в JWKS

JWT claims

iss, aud, sub, email, name, roles, iat, exp, jti, amr

JWKS

JSON Web Key Set на /.well-known/jwks.json, кэшируется, проверяется по kid

Параметры мы подобрали не от балды, а чтобы сбалансировать безопасность и удобство:

  1. 10–15 минут жизни токена — хватает, чтобы не плодить фантомные сессии, но и не мучить пользователя постоянными логинами. Если юзер активен — обновим.

  2. Безопасность как в аптеке: HttpOnly, Secure, SameSite=Lax, cookie на поддомен .example.ru, проверка iss, aud, exp, kid.

  3. Ключи можно ротировать через kid. Поддерживается сразу несколько для безболезненного перехода и обновлений.

Скрытый текст

Подготовка ключа и конфиг

Генерация ключа:

# приватный ключ
openssl genrsa -out storage/jwt/jwt_rsa.pem 4096
# публичный ключ
openssl rsa -in storage/jwt/jwt_rsa.pem -pubout -out storage/jwt/jwt_rsa.pub
chmod 600 storage/jwt/jwt_rsa.*

.env в Laravel:

APP_URL=https://api.example.ru
APP_KEY_ID=kid-1
JWT_COOKIE_DOMAIN=.example.ru
JWT_TTL_MIN=15

Nginx на edge:
# пробрасываем X-Request-Id и не трогаем HttpOnly-cookie
add_header X-Request-Id $request_id;

location /api/ {
  proxy_pass http://laravel_upstream;
  proxy_set_header Host $host;
  proxy_set_header X-Request-Id $request_id;
}

Контроллер JWKS

<?php

namespace App\Http\Controllers;

use Illuminate\Http\JsonResponse;
use Illuminate\Http\Response;

final class JwksController extends Controller
{
    public function show(): JsonResponse
    {
        $pem = file_get_contents(storage_path('jwt/jwt_rsa.pub'));
        $details = openssl_pkey_get_details(openssl_pkey_get_public($pem));

        $n = rtrim(strtr(base64_encode($details['rsa']['n']), '+/', '-_'), '=');
        $e = rtrim(strtr(base64_encode($details['rsa']['e']), '+/', '-_'), '=');

        return response()->json([
            'keys' => [[
                'kty' => 'RSA',
                'alg' => 'RS256',
                'use' => 'sig',
                'kid' => config('app.key_id', env('APP_KEY_ID', 'kid-1')),
                'n'   => $n,
                'e'   => $e,
            ]],
        ], Response::HTTP_OK, ['Content-Type' => 'application/json']);
    }
}

Маршрут:
// routes/web.php
use App\Http\Controllers\JwksController;

Route::get('/.well-known/jwks.json', [JwksController::class, 'show']);

Контроллер авторизации

<?php

namespace App\Http\Controllers;

use Firebase\JWT\JWT;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Auth;

final class AuthController extends Controller
{
    public function login(Request $request): JsonResponse
    {
        $credentials = $request->validate([
            'email'    => ['required', 'email'],
            'password' => ['required', 'string'],
        ]);

        if (! Auth::attempt($credentials)) {
            return response()->json(['ok' => false, 'error' => 'invalid'], 401);
        }

        $user = Auth::user();
        $ttl  = (int) env('JWT_TTL_MIN', 15);

        $payload = [
            'iss' => config('app.url'),
            'aud' => config('app.url'),
            'iat' => time(),
            'exp' => time() + $ttl * 60,
            'sub' => (string) $user->getAuthIdentifier(),
            'email' => (string) $user->email,
            'kid' => env('APP_KEY_ID', 'kid-1'),
        ];

        $privateKey = file_get_contents(storage_path('jwt/jwt_rsa.pem'));
        $jwt = JWT::encode($payload, $privateKey, 'RS256', $payload['kid']);

        return response()
          ->json(['ok' => true])
            ->cookie(
                'btjwt',
                $jwt,
                $ttl,
                '/',
                env('JWT_COOKIE_DOMAIN', '.example.ru'),
                true, // secure
                true, // httpOnly
                false, // raw
                'Lax' // SameSite
            );
    }

    public function logout(): JsonResponse
    {
        return response()
            ->json(['ok' => true])
            ->cookie(
                'btjwt',
                '',
                -1,
                '/',
                env('JWT_COOKIE_DOMAIN', '.example.ru'),
                true,
                true,
                false,
                'Lax'
            );
    }

    public function me(Request $request): JsonResponse
    {
        return response()->json([
            'ok'   => true,
            'user' => $request->user(),
        ]);
    }
}

Маршруты:
// routes/api.php
use App\Http\Controllers\AuthController;

Route::post('/auth/login',  [AuthController::class, 'login']);
Route::post('/auth/logout', [AuthController::class, 'logout']);
Route::get('/auth/me',      [AuthController::class, 'me'])->middleware('bitrix.jwt');

Проверка токена на Laravel-роутах

Сервис проверки по JWKS:

<?php

namespace App\Auth;

use Firebase\JWT\JWT;
use Firebase\JWT\Key;
use Illuminate\Support\Facades\Cache;
use RuntimeException;

final class JwtVerifier
{
    public function verify(string $jwt): object
    {
        $pem = Cache::remember('jwks:pem', 3600, function (): string {
            $json = file_get_contents(config('app.url') . '/.well-known/jwks.json');
            $jwks = json_decode($json, true, flags: \JSON_THROW_ON_ERROR);
            return $this->jwkToPem($jwks['keys'][0]);
        });

        $payload = JWT::decode($jwt, new Key($pem, 'RS256'));

        // Дополнительные проверки
        if (($payload->iss ?? '') !== config('app.url')) {
            throw new RuntimeException('Bad issuer');
        }
        if (($payload->aud ?? '') !== config('app.url')) {
            throw new RuntimeException('Bad audience');
        }

        return $payload;
    }

    private function jwkToPem(array $jwk): string
    {
        $n = base64_decode(strtr($jwk['n'], '-_', '+/'));
        $e = base64_decode(strtr($jwk['e'], '-_', '+/'));

        $rsa = pack('Ca*a*', 0x30, $this->asn1Len(
            pack('Ca*a*', 0x02, $this->asn1Int($n), 0x02) . $this->asn1Int($e)
        ));

        $spki = pack(
            'Ca*a*',
            0x30,
            $this->asn1Len(
                pack('Ca*', 0x30, $this->asn1Len(pack('H*', '300d06092a864886f70d0101010500'))) .
                pack('Ca*a*', 0x03, $this->asn1Len("\0" . $rsa), "\0" . $rsa)
            ),
            ''
        );

        return "-----BEGIN PUBLIC KEY-----\n" .
            chunk_split(base64_encode($spki), 64, "\n") .
            "-----END PUBLIC KEY-----\n";
    }

    private function asn1Len(string $x): string
    {
        $l = strlen($x);
        if ($l < 128) {
            return chr($l) . $x;
        }
        $bin = ltrim(pack('N', $l), "\0");
        return chr(0x80 | strlen($bin)) . $bin . $x;
    }

    private function asn1Int(string $x): string
    {
        $trim = ltrim($x, "\0");
        if ($trim === '' || (ord($trim[0]) & 0x80)) {
            $trim = "\0" . $trim;
        }
        return chr(strlen($trim)) . $trim;
    }
}

Middleware:

<?php

namespace App\Http\Middleware;

use App\Auth\JwtVerifier;
use App\Models\User;
use Closure;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Symfony\Component\HttpFoundation\Response;

final class BitrixJwtMiddleware
{
    public function __construct(private readonly JwtVerifier $verifier)
    {
    }

    /**
     * @param Closure(Request): Response $next
     */
    public function handle(Request $request, Closure $next): Response|JsonResponse
    {
        $jwt = $request->cookie('btjwt') ?: $request->bearerToken();
        if ($jwt === null) {
            return response()->json(['message' => 'Unauthenticated'], 401);
        }

        try {
            $payload = $this->verifier->verify($jwt);
        } catch (\Throwable) {
            return response()->json(['message' => 'Invalid token'], 401);
        }

        $user = User::firstOrCreate(
            ['bitrix_id' => (int) ($payload->sub ?? 0)],
            ['email' => isset($payload->email) ? (string) $payload->email : null]
        );

        auth()->login($user);

        return $next($request);
        }
}

Регистрация:

// app/Http/Kernel.php
protected $routeMiddleware = [
    // ...
    'bitrix.jwt' => \App\Http\Middleware\BitrixJwtMiddleware::class,
];

Битрикс: авторизация по JWT

Класс-обвязка:

<?php

namespace Project\Bitrix\Auth;

use Bitrix\Main\Application;
use Bitrix\Main\Web\HttpClient;
use Firebase\JWT\JWT;
use Firebase\JWT\Key;

final class JwtSso
{
    private const JWKS_CACHE_KEY = 'jwks-cache';
    private const JWKS_TTL       = 3600;

    public static function authorizeFromCookie(): void
    {
        global $USER;

        if ($USER->IsAuthorized()) {
            return;
        }

        $request = Application::getInstance()->getContext()->getRequest();
        $jwt = (string) $request->getCookie('btjwt');

        if ($jwt === '') {
            return;
        }

        try {
            $publicPem = self::fetchJwksPublicPem();
            $payload = JWT::decode($jwt, new Key($publicPem, 'RS256'));

            // Дополнительные проверки
            $iss = (string) ($payload->iss ?? '');
            $aud = (string) ($payload->aud ?? '');
            if ($iss !== 'https://api.example.ru' || $aud !== 'https://api.example.ru') {
                return;
            }

            $userId = (int) ($payload->sub ?? 0);
            if ($userId > 0) {
                $USER->Authorize($userId, false, true);
            }
        } catch (\Throwable) {
            // игнорируем, Битрикс продолжит как аноним
        }
    }

    private static function fetchJwksPublicPem(): string
    {
        $cache = Application::getInstance()->getManagedCache();
        if ($cache->read(self::JWKS_TTL, self::JWKS_CACHE_KEY)) {
            /** @var string $pem */
            $pem = $cache->get(self::JWKS_CACHE_KEY);
            return $pem;
        }

        $http = new HttpClient(['socketTimeout' => 2, 'streamTimeout' => 2]);
        $json = (string) $http->get('https://api.example.ru/.well-known/jwks.json');
        $jwks = json_decode($json, true, flags: \JSON_THROW_ON_ERROR);

        $pem = self::jwkToPem($jwks['keys'][0]);
        $cache->set(self::JWKS_CACHE_KEY, $pem);

        return $pem;
    }

    private static function jwkToPem(array $jwk): string
    {
        $n = base64_decode(strtr($jwk['n'], '-_', '+/'));
        $e = base64_decode(strtr($jwk['e'], '-_', '+/'));

        $rsa = pack('Ca*a*', 0x30, self::asn1Len(
            pack('Ca*a*', 0x02, self::asn1Int($n), 0x02) . self::asn1Int($e)
        ));

        $spki = pack(
            'Ca*a*',
     0x30,
            self::asn1Len(
                pack('Ca*', 0x30, self::asn1Len(pack('H*', '300d06092a864886f70d0101010500'))) .
                pack('Ca*a*', 0x03, self::asn1Len("\0" . $rsa), "\0" . $rsa)
            ),
            ''
        );

        return "-----BEGIN PUBLIC KEY-----\n" .
            chunk_split(base64_encode($spki), 64, "\n") .
            "-----END PUBLIC KEY-----\n";
    }

    private static function asn1Len(string $x): string
    {
        $l = strlen($x);
        if ($l < 128) {
            return chr($l) . $x;
        }
        $bin = ltrim(pack('N', $l), "\0");
        return chr(0x80 | strlen($bin)) . $bin . $x;
    }

    private static function asn1Int(string $x): string
    {
        $trim = ltrim($x, "\0");
        if ($trim === '' || (ord($trim[0]) & 0x80)) {
            $trim = "\0" . $trim;
        }
        return chr(strlen($trim)) . $trim;
    }
}

Подключение хука:

<?php

use Project\Bitrix\Auth\JwtSso;

AddEventHandler('main', 'OnBeforeProlog', static function (): void {
    JwtSso::authorizeFromCookie();
});

Безопасность и углы

  • Cookie btjwt только HttpOnly + Secure + SameSite=Lax, домен .example.ru.

  • Короткий TTL и необязательное обновление токена по активности.

  • Ротация ключей: держим 2–3 kid в JWKS, новый приватный ключ включаем заранее.

  • Проверка iss и aud, допустимое расхождение часов 30–60 секунд.

  • Для SPA с кросс-доменом - белый список CORS и credentials: include.

В общем, живем недолго, но безопасно, не усложняем, но и не халтурим.

Шаг Б. Outbox: когда «Сохранить» не значит «Подожди»

Интеграции в Битриксе часто выглядят как шутка с плохой развязкой. Нажимаешь «Сохранить», а дальше все висит, потому что в этот момент система синхронно ждет ответа от CRM, доставки или еще какого-нибудь внешнего сервиса, который сегодня не в духе. Один таймаут, и админка превращается в лотерею.

Outbox-паттерн спасает от этой боли. Вместо того, чтобы дергать интеграции «в лоб», мы сначала записываем событие в свою таблицу, а уже потом фоново публикуем его в очередь. Все в рамках одной транзакции. Контент-менеджер нажимает кнопку, данные сохраняются, можно выдыхать. Остальное система разрулит сама.

Что там происходит:

Событие

Что делает система

Что это дает

Вы сохраняете товар, цену или заказ

Вместе с сохранением пишется запись в таблицу outbox

Все в одной транзакции без шансов потерять событие

Событие включает: тип, объект, полезную нагрузку, время и статус

Все, что нужно, чтобы потом доставить и обработать

Можно отследить, повторно отправить или проигнорировать дубликат

Фоновый агент просматривает outbox и публикует события в Redis Streams

Работает в фоне, не тормозит интерфейс

Даже если Redis «лежит», сохранение не ломается

Если Redis не ответил или случился сбой

Событие остается в таблице, повторная попытка будет позже

Ретрай с умным бэк-офом, ничего не теряется

Если все прошло успешно

Статус события меняется на published, мы логируем request_id

Есть прозрачный след: что, когда и куда ушло

Если что-то не так

Уходит в Dead Letter Queue (DLQ) — специальную «карантинную» зону

Ничего не падает, все под контролем.

Если раньше падение RabbitMQ превращало админку в болото, теперь все иначе. Сохранили и пошли дальше, а outbox сам разберется, когда и куда слать.

Что отправляем в стримы:

Тип события

Что произошло

catalog.element.updated

Обновили товар

price.changed

Изменилась цена

stock.changed

Обновились остатки

order.created

Новый заказ

order.updated

Изменили существующий заказ

Что получаем взамен:

  • админка сохраняет мгновенно, даже если CRM отвечает через раз;

  • все события доходят куда надо — с логами, ретраями и наблюдаемостью;

  • любую ситуацию можно отмотать по occurred_at и восстановить целиком.

ORM-модель:

Скрытый текст
<?php

namespace Project\Bitrix\Outbox;

use Bitrix\Main\Entity;
use Bitrix\Main\Type\DateTime;

final class OutboxEventTable extends Entity\DataManager
{
    public static function getTableName(): string
    {
        return 'project_outbox_events';
    }

    public static function getMap(): array
    {
        return [
            (new Entity\IntegerField('ID'))
                ->configurePrimary(true)
                ->configureAutocomplete(true),

            (new Entity\StringField('EVENT_TYPE'))
     ->configureRequired(true),

            (new Entity\StringField('AGGREGATE_TYPE'))
                ->configureRequired(true),

            (new Entity\StringField('AGGREGATE_ID'))
                ->configureRequired(true),

            (new Entity\TextField('PAYLOAD_JSON'))
                ->configureRequired(true),

            (new Entity\DatetimeField('OCCURRED_AT'))
                ->configureRequired(true)
                ->configureDefaultValue(static fn() => new DateTime()),

            (new Entity\DatetimeField('AVAILABLE_AT'))
                ->configureRequired(true)
                ->configureDefaultValue(static fn() => new DateTime()),

            (new Entity\IntegerField('ATTEMPTS'))
                ->configureDefaultValue(0),

            (new Entity\StringField('STATUS'))
                ->configureDefaultValue('pending'),

            (new Entity\TextField('LAST_ERROR')),

            (new Entity\StringField('HASH'))
                ->addValidator(new Entity\Validator\Length(1, 64))
                ->configureRequired(true),
        ];
    }
}

Примечание: в миграции/установщике модуля нужно будет добавить уникальный индекс на HASH, а также составной индекс STATUS+AVAILABLE_AT для выбора батчей.

Запись события из Битрикс в момент изменения

Пример: элемент каталога отредактирован - пишем событие catalog.element.updated.

<?php

use Bitrix\Main\Web\Json;
use Project\Bitrix\Outbox\OutboxEventTable;

AddEventHandler('iblock', 'OnAfterIBlockElementUpdate', static function(array $arFields): void {
    if (! isset($arFields['ID'], $arFields['IBLOCK_ID'])) {
        return;
    }

    $aggregateType = 'catalog.element';
    $aggregateId   = (string) $arFields['ID'];
    $eventType     = 'catalog.element.updated';

    $payload = [
        'id' => (int) $arFields['ID'],
        'iblock_id' => (int) $arFields['IBLOCK_ID'],
        'changed' => array_keys((array) ($arFields['FIELDS'] ?? [])),
        'ts' => time(),
        'request_id' => \Bitrix\Main\Diag\Helper::getRequestId(),
    ];

    $json = Json::encode($payload, JSON_UNESCAPED_UNICODE);
    $hash = hash('sha256', $eventType . '|' . $aggregateType . '|' . $aggregateId . '|' . $json);

    OutboxEventTable::add([
        'EVENT_TYPE' => $eventType,
        'AGGREGATE_TYPE' => $aggregateType,
        'AGGREGATE_ID' => $aggregateId,
        'PAYLOAD_JSON' => $json,
        'HASH' => $hash,
        // OCCURRED_AT/AVAILABLE_AT проставятся по умолчанию
    ]);
});

Тот же приём работает для заказов, цен и остатков: OnSaleOrderSaved, изменения в catalog_price, синхронизация складов.

Публикация в шину: фоновый агент Битрикс

<?php

namespace Project\Bitrix\Outbox;

use Bitrix\Main\Application;
use Bitrix\Main\Result;
use Redis;

final class OutboxPublisher
{
    private const BATCH_SIZE = 200;
    private const STREAM_MAP = [
        'catalog.element.updated' => 'catalog.events',
        'price.changed' => 'price.events',
        'stock.changed' => 'stock.events',
        'order.created' => 'order.events',
        'order.updated' => 'order.events',
    ];

    public static function run(): string
    {
        $connection = Application::getConnection();
        $sql = "
            SELECT * FROM project_outbox_events
            WHERE STATUS = 'pending' AND AVAILABLE_AT <= NOW()
            ORDER BY ID ASC
            LIMIT " . self::BATCH_SIZE;

        $rows = $connection->query($sql)->fetchAll();

        if ($rows === []) {
            return 'OutboxPublisher::run();';
        }
     $redis = new Redis();
        $redis->connect('127.0.0.1', 6379);
        $redis->setOption(Redis::OPT_READ_TIMEOUT, 1);

        foreach ($rows as $row) {
            $stream = self::STREAM_MAP[$row['EVENT_TYPE']] ?? null;
            if ($stream === null) {
                self::markFailed((int) $row['ID'], 'Unknown event type');
                continue;
            }

            try {
                // XADD stream * fields
                $id = $redis->xAdd($stream, '*', [
                    'event_type' => $row['EVENT_TYPE'],
                    'aggregate' => $row['AGGREGATE_TYPE'] . ':' . $row['AGGREGATE_ID'],
                    'payload_json' => $row['PAYLOAD_JSON'],
                    'hash' => $row['HASH'],
                    'occurred_at' => (string) $row['OCCURRED_AT'],
                ]);

                self::markPublished((int) $row['ID'], (string) $id);
            } catch (\Throwable $e) {
                self::markRetry((int) $row['ID'], (string) $e->getMessage());
            }
        }

        return 'OutboxPublisher::run();';
    }

    private static function markPublished(int $id, string $externalId): void
    {
        OutboxEventTable::update($id, [
            'STATUS' => 'published',
            'ATTEMPTS' => new \Bitrix\Main\DB\SqlExpression('ATTEMPTS + 1'),
            'LAST_ERROR' => null,
        ]);
    }

    private static function markRetry(int $id, string $error): void
    {
        // простой экспоненциальный бэкофф: +1, +2, +4, ... минут
        $attempts = 0;
        $row = OutboxEventTable::getByPrimary($id)->fetch();
       if ($row) {
            $attempts = (int) $row['ATTEMPTS'];
        }

        $minutes = min(60, 2 ** max(0, $attempts));
        $available = (new \Bitrix\Main\Type\DateTime())->add("+" . $minutes . " minutes");

        OutboxEventTable::update($id, [
            'STATUS' => 'pending',
            'ATTEMPTS' => $attempts + 1,
            'LAST_ERROR' => $error,
            'AVAILABLE_AT' => $available,
        ]);
    }

    private static function markFailed(int $id, string $error): void
    {
        OutboxEventTable::update($id, [
            'STATUS' => 'failed',
            'LAST_ERROR' => $error,
        ]);
    }
}

Агент регистрируется штатно, с периодом в 1 минуту. Даже если Redis недоступен, события остаются в outbox и публикуются позже.

Потребление событий в Laravel: группы, идемпотентность, DLQ

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Redis;

final class ConsumeEvents extends Command
{
    protected $signature = 'events:consume {group=svc} {--streams=catalog.events,price.events,stock.events,order.events}';
    protected $description = 'Читает бизнес-события из Redis Streams и диспатчит обработчики';

    public function handle(): int
    {
        $group = (string) $this->argument('group');
        $streams = array_map('trim', explode(',', (string) $this->option('streams')));

        $r = new Redis();
        $r->connect('127.0.0.1', 6379);
        $r->setOption(Redis::OPT_READ_TIMEOUT, 1);

        // создаём группы, если их ещё нет
        foreach ($streams as $s) {
            try {
                $r->xGroup('CREATE', $s, $group, '0', true);
            } catch (\RedisException) {
                // группа уже есть
            }
        }

        $streamPairs = [];
        foreach ($streams as $s) {
            $streamPairs[$s] = '>';
        }

        while (true) {
            $chunk = $r->xReadGroup($group, gethostname() ?: 'consumer', $streamPairs, 100, 2000);
            if (! $chunk) {
                continue;
            }

            foreach ($chunk as $stream => $messages) {
                foreach ($messages as $id => $fields) {
                    try {
                        $this->dispatch($stream, $fields);
                        $r->xAck($stream, $group, [$id]);
                    } catch (\Throwable $e) {
                        // перекидываем в DLQ
                        $r->xAdd($stream . '.dlq', '*', $fields + ['error' => $e->getMessage()]);
                        $r->xAck($stream, $group, [$id]);
                    }
                }
            }
        }
    }
 
    /**
     * @param array<string,string> $fields
     */
    private function dispatch(string $stream, array $fields): void
    {
        $type = $fields['event_type']   ?? '';
        $json = $fields['payload_json'] ?? '{}';
        $hash = $fields['hash']         ?? '';
        $aggKey = $fields['aggregate']    ?? '';

        // простая идемпотентность на 24 часа
        $key = "idem:{$type}:{$aggKey}:{$hash}";
        if (! Cache::add($key, 1, 86400)) {
            return; // уже обрабатывали
        }

        match ($type) {
            'catalog.element.updated' => \App\Jobs\ReindexProduct::dispatch($json),
            'price.changed' => \App\Jobs\RefreshPricing::dispatch($json),
            'stock.changed' => \App\Jobs\RefreshInventory::dispatch($json),
            'order.created',
            'order.updated' => \App\Jobs\SyncOrder::dispatch($json),
            default => null,
        };
    }
}

Пример обработчика:

<?php
namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

final class ReindexProduct implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries   = 5;
    public int $backoff = 10;

    public function __construct(private readonly string $payloadJson)
    {
    }

    public function handle(): void
    {
        $data = \json_decode($this->payloadJson, true, flags: \JSON_THROW_ON_ERROR);

        // 1) обновляем витрины svc_catalog_*
        // 2) индексируем документ в OpenSearch
        // 3) инвалидируем кэши

        // ... доменная логика тут
    }
}

Контракты и версионирование

Минимальный пример AsyncAPI-фрагмента в yaml-файле:

asyncapi: 2.6.0
info:
  title: Project Events
  version: 1.0.0
channels:
  catalog.events:
    subscribe:
      message:
        name: catalog.element.updated
        payload:
          type: object
          required: [id, iblock_id, changed, ts]
          properties:
            id: { type: integer }
            iblock_id: { type: integer }
            changed: { type: array, items: { type: string } }
            ts: { type: integer }
            request_id: { type: string }

Добавляем request_id везде - это связывает логи Битрикс, публишера и консьюмера.

В результате — спокойная админка, предсказуемый Битрикс и разработка без бесконечных «а у нас опять ничего не работает». Все работает. 

Шаг В. Наблюдаемость: видеть, а не догадываться

Когда в проде что-то идет не так, первыми обычно узнает не спецы по мониторингу, а менеджеры.  Клиенты пишут, звонят с вопросами, почему не работает и без конкретики, а нам — гадай, где именно все застопорилось.

Мы от этого устали, и теперь у нас все прозрачно: каждая заявка, каждый товар, каждое событие — под прицелом метрик, логов и трассировок. 

Что внедрили:

Компонент

Что дает

X-Request-Id сквозь систему

Один идентификатор на весь путь — чтобы логи и трейсы были связаны

OpenTelemetry

Визуализируем цепочки вызовов — от контроллера до OpenSearch и обратно

Sentry

Ошибки в Laravel и Битриксе фиксируются с контекстом и пользователями

Prometheus

Снимаем метрики: задержки, ошибки, глубину очередей, повторы

Что отслеживаем:

  • разницу между временем события (occurred_at) и его фактической обработкой;

  • P95 latency — реальное ощущение скорости глазами пользователя;

  • долю ошибок 5xx — если больше 0,5%, включаем маяк;

  • нагрузку на очереди: как быстро обрабатываются, не растет ли DLQ;

  • повторяющиеся ошибки и ретраи — сигнал, что кто-то упорно ломится в закрытую дверь.

Скрытый текст

Фрагмент конфига nginx:

# в http{} или server{} блоке
map $http_x_request_id $req_id {
    default $http_x_request_id;
    ""      $request_id;        # если пришёл без id - сгенерируем
}

server {
    # ...

    # для всех ответов
    add_header X-Request-Id $req_id;

    location /api/ {
        proxy_set_header X-Request-Id $req_id;
        proxy_pass http://laravel_upstream;
    }

    location / {
        proxy_set_header X-Request-Id $req_id;
        proxy_pass http://bitrix_upstream;
    }
}

Битрикс: фиксация коррелятора и доступ из кода

<?php

namespace Project\Obs;

final class Correlation
{
    private const HEADER = 'HTTP_X_REQUEST_ID';
    private static ?string $id = null;

    public static function id(): string
    {
        if (self::$id !== null) {
            return self::$id;
        }

        $server = $_SERVER[self::HEADER] ?? '';
        self::$id = $server !== '' ? $server : bin2hex(random_bytes(16));

        // Отдадим назад тем же заголовком
        header('X-Request-Id: ' . self::$id);

        return self::$id;
    }
}

Теперь при записи событий в outbox из предыдущего шага добавляем request_id:

$payload = [
    // ...
    'request_id' => \Project\Obs\Correlation::id(),
];

Laravel: middleware для корреляции

<?php

namespace App\Http\Middleware;

use Closure;
use Illuminate\Http\Request;
use Illuminate\Support\Str;
use Symfony\Component\HttpFoundation\Response;

final class Correlate
{
    public function handle(Request $request, Closure $next): Response
    {
        $id = $request->headers->get('X-Request-Id') ?: Str::uuid()->toString();
        $request->headers->set('X-Request-Id', $id);

        // прокинем в лог-контекст
        \Log::withContext(['request_id' => $id]);

        /** @var Response $response */
        $response = $next($request);
        $response->headers->set('X-Request-Id', $id);

        return $response;
    }
}

Включаем в app/Http/Kernel.php в глобальные middleware.

Sentry: ошибки и перфоманс

Laravel

composer require sentry/sentry-laravel:^4.6
php artisan vendor:publish --provider="Sentry\Laravel\ServiceProvider"

Bitrix

composer require sentry/sentry:^4.0

config/sentry.php:

<?php

return [
    'dsn' => env('SENTRY_LARAVEL_DSN'),
    'traces_sample_rate' => 0.2, // 20 % запросов с перфоманс-трейсами
    'profiles_sample_rate' => 0.0,
    'before_send' => static function (\Sentry\Event $event) {
        // Положим X-Request-Id в event tag, если есть
        $rid = request()?->headers->get('X-Request-Id');
        if ($rid) {
            $event->setTag('x_request_id', $rid);
        }
        return $event;
    },
];

/local/php_interface/init.php:

<?php

\Sentry\init([
    'dsn' => getenv('SENTRY_PHP_DSN'),
    'traces_sample_rate' => 0.1,
    'before_send' => static function (\Sentry\Event $event): \Sentry\Event {
        $rid = $_SERVER['HTTP_X_REQUEST_ID'] ?? null;
        if ($rid) {
            $event->setTag('x_request_id', $rid);
        }
        return $event;
    },
]);

// Пример отправки исключения
try {
    // ... ваш код
} catch (\Throwable $e) {
    \Sentry\captureException($e);
}

OpenTelemetry: трассировка от HTTP до очередей

Laravel: базовая инициализация

composer require open-telemetry/opentelemetry-sdk:^1.0 open-telemetry/exporter-otlp:^1.0

Сервис-провайдер:

<?php

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use OpenTelemetry\API\Globals;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor;
use OpenTelemetry\SDK\Trace\TracerProviderFactory;

final class TelemetryServiceProvider extends ServiceProvider
{
    public function register(): void
    {
        $factory = new TracerProviderFactory();
        $provider = $factory->create(); // читает OTEL_* из env
        Globals::registerTracerProvider($provider);

        // Можно добавить BatchSpanProcessor, если используете ручную сборку
        // $provider->addSpanProcessor(new BatchSpanProcessor($exporter));
    }
}

В .env:

OTEL_SERVICE_NAME=laravel-api
OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318
OTEL_TRACES_SAMPLER=parentbased_traceidratio
OTEL_TRACES_SAMPLER_ARG=0.2

Оборачиваем критичные места спанами

Контроллер поиска:

use OpenTelemetry\API\Trace\SpanBuilderInterface;
use OpenTelemetry\API\Trace\TracerInterface;
use OpenTelemetry\API\Globals as Otel;

final class CatalogController extends Controller
{
    public function search(Request $request, SearchIndex $index): JsonResponse
    {
        $tracer = Otel::tracerProvider()->getTracer('catalog');

        $span = $tracer->spanBuilder('catalog.search')
            ->setAttribute('http.request_id', $request->header('X-Request-Id'))
            ->setAttribute('query.q', (string) $request->get('q', ''))
            ->startSpan();

        try {
            // ... ваш код поиска
        } finally {
            $span->end();
        }

        // ...
    }
}

Консьюмер Redis Streams из предыдущего шага можно расширить:

use OpenTelemetry\API\Globals as Otel;

$tracer = Otel::tracerProvider()->getTracer('events-consumer');

$span = $tracer->spanBuilder('stream.consume')
    ->setAttribute('stream', $stream)
    ->setAttribute('event_type', $type)
    ->setAttribute('aggregate', $aggKey)
    ->startSpan();

try {
    // обработка
} finally {
    $span->end();
}

Экспортируются спаны в OTLP Collector, дальше - Grafana Tempo/Jaeger на ваш выбор. Главное, чтобы по тегу http.request_id можно было склеить HTTP и worker.

Prometheus: простые метрики

Laravel: метрики HTTP и поиска

composer require promphp/prometheus_client_php:^2.10

Регистрируем репозиторий и экспортёр:

<?php

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use Prometheus\CollectorRegistry;
use Prometheus\RenderTextFormat;
use Prometheus\Storage\InMemory;

final class MetricsServiceProvider extends ServiceProvider
{
    public function register(): void
    {
        $this->app->singleton(CollectorRegistry::class, function () {
            return new CollectorRegistry(new InMemory());
        });
    }
}

Middleware для счётчика и гистограммы:

<?php

namespace App\Http\Middleware;

use Closure;
use Illuminate\Http\Request;
use Prometheus\CollectorRegistry;
use Symfony\Component\HttpFoundation\Response;

final class HttpMetrics
{
    public function __construct(private readonly CollectorRegistry $registry)
    {
    }

    public function handle(Request $request, Closure $next): Response
    {
        $hist = $this->registry->getOrRegisterHistogram(
            'app', 'http_server_duration_seconds',
            'HTTP latency',
            ['route', 'method', 'code'],
            [0.05, 0.1, 0.2, 0.5, 1, 2]
        );

        $counter = $this->registry->getOrRegisterCounter(
            'app', 'http_requests_total',
            'HTTP requests',
            ['route', 'method', 'code']
        );

        $start = \microtime(true);
        /** @var Response $res */
        $res = $next($request);
        $dur = \microtime(true) - $start;

        $labels = [$request->route()?->getName() ?? 'unknown', $request->getMethod(), (string) $res->getStatusCode()];

        $hist->observe($dur, $labels);
        $counter->inc($labels);

        return $res;
    }
}

Эндпоинт /metrics:

<?php

use Illuminate\Support\Facades\Route;
use Prometheus\CollectorRegistry;
use Prometheus\RenderTextFormat;

Route::get('/metrics', function (CollectorRegistry $registry) {
    $renderer = new RenderTextFormat();
    $result = $renderer->render($registry->getMetricFamilySamples());

    return response($result, 200, ['Content-Type' => RenderTextFormat::MIME_TYPE]);
});

Лаг очередей в консьюмере

В команде чтения Streams после парсинга сообщений:

$gauge = app(CollectorRegistry::class)->getOrRegisterGauge(
    'app', 'stream_lag_seconds',
    'Lag between occurred_at and consume time',
    ['stream']
);

$occurred = isset($fields['occurred_at']) ? strtotime($fields['occurred_at']) : null;
if ($occurred) {
    $gauge->set(max(0, time() - $occurred), [$stream]);
}

Политики и алерты

  • SLO для API: P95 150 мс на кэш-хит, 400 мс на мисс. Алерт - когда три 5-минутных окна подряд выше порога.

  • SLO для индексации: лаг P95 < 10 с. Алерт - если лаг > 30 с 10 минут.

  • Ошибки 5xx: доля > 0.5 % в течение 5 минут - алерт.

  • Очереди: длина DLQ > 0 - предупреждение, > N - критический.

Важная деталь - не собирать PII в события и спаны. Прогоняйте фильтры для скрытия e-mail и телефонов, оставляйте технические атрибуты: ids, статусы, размеры выборок, тайминги, коды ошибок.

Когда клиент приходит с проблемой, нам не нужно устраивать квест по логам или гадать, где завис товар. Мы сразу видим, какое событие не дошло, какой сервис сейчас тормозит и сколько времени займет все наверстать.

Наблюдаемость — это про возможность понять, что пошло не так, не теряя время на догадки. Когда в системе что-то ломается, важно не просто увидеть ошибку, а точно знать — где она, что ее вызвало и как быстро можно починить. С таким подходом легче и работать, и отвечать на вопросы, и просто быть уверенным в том, что ты идешь куда надо.

Продолжение следует...

Комментарии (0)