Продолжение статьи https://habr.com/ru/articles/1028290/

Твой ии-агент мог бы сыграть в сериале "Кибердеревня"
Твой ии‑агент мог бы сыграть в сериале «Кибердеревня»

В прошлой серии

Мы поняли, что скелетом агента является долговременное состояние (durable state). Именно оно должно позволить ответить на скучные, но жизненно важные вопросы: какой ход активен, какой шаг уже выполнен, какой job держит lease, какой файл был исходным, какой результат можно выдать пользователю, какое подтверждение еще действительно.

В первой части мы разложили durable state на ход агента, шаг плана и событие. У нас появились такие сущности, как AgentTurn, AgentPlanItem, AgentEvent, и агент уже перестает быть нервным генератором текста, который живет ровно до первого рестарта процесса.

Но трех таблиц мало. Нужны еще разрешения, состояние диалоги/сессии, состояние проекта, фоновые задачи, механизм обработки фоновых задач (lease), счетчик и политика повторов, закладка событий (event cursor) и санитарная обработка payload‑ов (payload sanitizer).

Что добавляется после AgentTurn, AgentPlanItem и AgentEvent

Минимальный набор первой части можно расширить так:

Сущность

Что хранит

Зачем нужна

ApprovalGrant

Выданное пользователем разрешение

Не спрашивать повторно одно и то же действие в рамках допустимого scope

SessionContext

Активный turn, профиль агента, краткую историю, pending approval

Восстановить диалог и текущую сцену сессии

ProjectContext

Активный проект, файлы, настройки, текущую операцию

Не дать двум тяжелым операциям одновременно менять один проект

BackgroundJob

Длинную операцию вне HTTP‑запроса

Например, парсинг, workbook‑операции, retry, progress, cancellation

WorkerHeartbeat

Присутствие и занятость исполнителя

Отличить долгую работу от умершего worker‑а

Durable payload policy

Правила сохранения payload‑ов

Не складывать base64, секреты и гигантские строки в event log

ApprovalGrant: подтверждение тоже должно быть durable

Approval — это юридически важная запись о том, что пользователь разрешил действие с конкретным scope. Если подтверждение живет только в памяти процесса, то после рестарта агент снова спросит то же самое или, что хуже, решит продолжить без понятного основания.

Разрешения привязаны к session_id, project_id, tool_name, mode, scope и expires_at. Это правильная форма: подтверждение не становится вечным. Пользователь мог разрешить править файлы на этом проекте, но это не значит, что агент получил право трогать все проекты, все файлы и все будущие операции.

Хороший approval grant должен быть узким. В идеале scope описывает не человеческую фразу «можно», а машинно проверяемые границы: project_id, tool_name, режим only_missing, срок действия. Тогда executor может принять решение без повторного похода к LLM.

Класс AprrovalGrant, она же таблица для хранения разрешений (прав доступа) на выполнение определенных действий или использование инструментов в какой‑то системе, может выглядеть следующим образом

class ApprovalGrant(Base):
    __tablename__ = "approval_grants"
 
    grant_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
    session_id: Mapped[str] = mapped_column(String(200), index=True)
    project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
    tool_name: Mapped[str] = mapped_column(String(120), index=True)
    mode: Mapped[str] = mapped_column(String(40), index=True)
    scope: Mapped[dict] = mapped_column(JSON, default=dict)
    reason: Mapped[str | None] = mapped_column(Text, nullable=True)
    expires_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

Разбор полей (колонок):

  • grant_id: Уникальный идентификатор каждой записи (UUID). Генерируется автоматически.

  • session_id: ID сессии пользователя. Позволяет понять, в рамках какого сеанса выдано разрешение.

  • project_id: Связь с конкретным проектом (может быть пустым).

  • tool_name: Название инструмента, к которому запрашивается доступ (например, «база_данных», «отправка_email»).

  • mode: Режим доступа (например, «чтение», «запись» или «админ»).

  • scope: Дополнительные параметры в формате JSON. Позволяет хранить сложные настройки доступа в виде словаря.

  • reason: Текстовое описание того, зачем это разрешение было выдано.

  • expires_at: Срок годности разрешения. Если время вышло, доступ аннулируется.

  • created_at: Время создания записи (автоматически ставится текущее время UTC).

SessionContext: агент должен помнить, где он находится

Пока у Гэндальфа не было Session Context, он был серым
Пока у Гэндальфа не было Session Context, он был серым

SessionContext — это durable‑состояние диалога. Не transcript, не полный лог сообщений и не «вся память агента», а компактная техническая карточка текущей сессии.

Если AgentTurn отвечает на вопрос «какой запрос сейчас выполняется», то SessionContext отвечает на вопрос «в какой сцене находится пользователь и агент».

Например:

  • какой turn_id сейчас активен;

  • есть ли незавершенное подтверждение;

  • какой проект открыт;

  • какой профиль агента выбран;

  • какой краткий summary уже построен;

  • с какого события UI нужно продолжить чтение после reconnect;

  • какие операции сейчас нельзя запускать параллельно.

То есть SessionContext нужен не для философской «памяти», а для скучной инженерной магии: закрыли вкладку, обновили страницу, перезапустили backend, worker умер, пользователь вернулся через час — и система все еще понимает, что происходит.

Примерная структура:

class SessionContext(Base):
    __tablename__ = "session_contexts"

    session_id: Mapped[str] = mapped_column(String(200), primary_key=True)
    user_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
    project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    active_turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
    active_job_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    status: Mapped[str] = mapped_column(String(40), default="idle", index=True)
    agent_profile: Mapped[str] = mapped_column(String(80), default="default")

    summary: Mapped[str | None] = mapped_column(Text, nullable=True)

    pending_approval: Mapped[dict | None] = mapped_column(JSON, nullable=True)

    event_cursor: Mapped[int] = mapped_column(default=0)
    context_version: Mapped[int] = mapped_column(default=1)

    last_user_message_at: Mapped[datetime | None] = mapped_column(nullable=True)
    last_agent_event_at: Mapped[datetime | None] = mapped_column(nullable=True)

    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
    )

Поле

Что хранит

Зачем нужно

session_id

ID сессии

Главный ключ состояния диалога

user_id

Пользователь

Изоляция сессий и прав

active_project_id

Текущий проект

Понимать рабочий контекст

active_turn_id

Текущий ход агента

Восстановить незавершенный turn

agent_profile

Режим/персона/настройки агента

Например, «код‑ревьюер», «переводчик», «аналитик»

summary

Сжатая история

Не тащить весь transcript в каждый prompt

pending_approval

Ожидаемое подтверждение

Не потерять confirm после рестарта

event_cursor

Последнее доставленное событие

Догнать UI после reconnect

status

active, waiting_user, running, idle

Быстро понять состояние сессии

updated_at

Время обновления

Отладка, TTL, чистка старых сессий

Важно: SessionContext не должен превращаться в помойку. В него не надо складывать весь prompt, все ответы модели, base64 файлов и простыню traceback‑ов. Для этого есть AgentEvent, файловое хранилище, blob storage и отдельные job‑таблицы.

Хороший SessionContext маленький, скучный и восстанавливаемый.

ProjectContext: агент не должен пилить один проект двумя руками одновременно

Еще одна сущность, которая быстро становится необходимой, — ProjectContext.

Сессия отвечает за диалог. Проект отвечает за рабочую область.

Пользователь может открыть один проект в нескольких вкладках,, потом еще что‑то сделать, потом нажать «повторить». Если система не хранит durable‑состояние проекта, два job‑а могут одновременно начать менять одни и те же файлы.

И получится не AI‑agent, а кибердеревенский комбайн, который одной рукой чинит забор, второй рукой уже его сносит.

сlass ProjectContext(Base):
    __tablename__ = "project_contexts"

    project_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    owner_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    active_operation_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    operation_lock: Mapped[dict | None] = mapped_column(JSON, nullable=True)

    latest_output_file_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    settings: Mapped[dict | None] = mapped_column(JSON, nullable=True)

    status: Mapped[str] = mapped_column(String(40), default="idle", index=True)

    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
    )

ProjectContext хранит:

Поле

Что хранит

project_id

ID проекта

owner_id

Владелец

active_operation_id

Текущая тяжелая операция

operation_lock

Мягкая блокировка проекта

latest_output_file_id

Последний результат

settings

Настройки проекта

status

idle, processing, needs_review, failed

updated_at

Последнее изменение

Это не обязательно жесткий database lock. Чаще достаточно прикладной блокировки: «в этом проекте уже идет операция типа workbook_write, вторую такую же не запускаем».

Например, можно разрешить читать файл и строить preview, но запретить одновременно две операции, которые пишут результат в один и тот же output slot.

BackgroundJob: долгая работа не должна жить внутри HTTP‑запроса

Твой агент запустил бэкграунд джоб
Твой агент запустил бэкграунд джоб

Если операция может занять больше пары секунд, она должна стать job‑ом.

HTTP‑запрос может принять задачу, проверить права, создать AgentTurn, положить BackgroundJob в очередь и вернуть пользователю состояние: «задача принята». А дальше работает worker.

Класс джоб будет таким

class Job(Base): tablename = “jobs”

class Job(Base):
    __tablename__ = "operation_jobs"

    job_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)

    turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
    project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    type: Mapped[str] = mapped_column(String(80), index=True)
    status: Mapped[str] = mapped_column(String(40), default="queued", index=True)

    attempt: Mapped[int] = mapped_column(default=0)
    max_attempts: Mapped[int] = mapped_column(default=3)

    next_attempt_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)

    lease_owner: Mapped[str | None] = mapped_column(String(200), nullable=True, index=True)
    lease_expires_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)

    progress_seq: Mapped[int] = mapped_column(default=0)

    input: Mapped[dict | None] = mapped_column(JSON, nullable=True)
    output: Mapped[dict | None] = mapped_column(JSON, nullable=True)
    error: Mapped[dict | None] = mapped_column(JSON, nullable=True)

    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
    )

Поле

Что хранит

job_id

ID задачи

turn_id

К какому turn относится

project_id

В каком проекте выполняется

type

parse_workbook, render, export

status

queued, running, completed, failed, cancelled

attempt

Номер попытки

max_attempts

Лимит повторов

next_attempt_at

Когда можно retry

lease_owner

Какой worker забрал задачу

lease_expires_at

Когда lease протухает

progress_seq

Монотонный номер progress‑события

input

Санитизированный input

output

Ссылка на результат

error

Классифицированная ошибка

Ключевой момент — worker не просто берет задачу. Он атомарно claim‑ит ее:

UPDATE background_jobs SET status = 'running', lease_owner = :worker_id, lease_expires_at = :now + interval '2 minutes' WHERE job_id = :job_id AND status = 'queued' AND (next_attempt_at IS NULL OR next_attempt_at <= :now);

Если обновилась одна строка — worker владеет задачей. Если ноль строк — кто‑то уже забрал.

Lease нужен, потому что worker может умереть. Не «вернуть ошибку», не «аккуратно завершиться», а просто исчезнуть. После истечения lease_expires_at другой worker может подобрать задачу и продолжить или перезапустить ее с учетом idempotency.

EventCursor: UI должен догонять события, а не молиться на websocket

Live stream — это приятно, но websocket не является durable state.

Пользователь закрыл ноутбук, сеть моргнула, вкладка перезагрузилась. Если события жили только в памяти процесса, прогресс потерян. Поэтому UI должен читать события из AgentEvent по cursor‑у.

Условный сценарий:

  1. UI подписался на события turn‑а.

  2. Получил события до event_seq = 42.

  3. Соединение оборвалось.

  4. UI reconnect‑ится и говорит: «дай события после 42».

  5. Backend читает durable event log и отдает 43, 44, 45....

Так интерфейс перестает зависеть от идеальной сети.

class EventCursor(Base):
    __tablename__ = "event_cursors"

    cursor_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)

    session_id: Mapped[str] = mapped_column(String(200), index=True)
    turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    consumer_id: Mapped[str] = mapped_column(String(200), index=True)

    last_event_seq: Mapped[int] = mapped_column(BigInteger, default=0)

    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
    )

    __table_args__ = (
        UniqueConstraint(
            "session_id",
            "turn_id",
            "consumer_id",
            name="uq_event_cursor_consumer",
        ),
    )

event_seq лучше делать монотонным внутри turn_id или session_id. Не надо использовать только timestamp: у двух событий может быть одинаковое время, а порядок все равно важен.

Управление агентом с ходами и состояниями чем-то напоминает пошаговую стратегию
Управление агентом с ходами и состояниями чем‑то напоминает пошаговую стратегию

Durable payload policy: event log не мусорный бак

Отдельно стоит прописать политику payload‑ов.

Почти в каждом агенте рано или поздно появляется соблазн: «давайте просто положим весь JSON в event payload». Через месяц в event log лежат base64-файлы, токены доступа, гигантские HTML‑страницы, персональные данные и ответы модели на 300 килобайт.

Правило простое: durable event должен хранить факт, ссылку и короткий summary, а не весь мир.

Плохо:

{ "event_type": "file_processed", "payload": { "file_base64": "UEsDBBQAAAA...", "openai_api_key": "sk-...", "full_html": "<html>..." } }

Хорошо:

{ "event_type": "file_processed", "payload": { "file_id": "file_123", "mime_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "rows_count": 1842, "result_file_id": "file_456" } }

Payload sanitizer должен вырезать:

  • секреты;

  • base64 и бинарные данные;

  • слишком длинные строки;

  • полные prompt‑ы без необходимости;

  • персональные данные, если они не нужны для восстановления процесса.

Идеально, если sanitizer применяется централизованно перед записью AgentEvent, а не «по договоренности между разработчиками». Договоренность живет до первого пятничного hotfix‑а.

Все это ложится в один централизованный слой перед добавлением AgentEvent

import json
import re
from typing import Any


SECRET_KEY_RE = re.compile(
    r"(api[_-]?key|token|secret|password|authorization|cookie|access[_-]?token|refresh[_-]?token)",
    re.IGNORECASE,
)

BASE64_RE = re.compile(r"^[A-Za-z0-9+/]+={0,2}$")


class EventPayloadSanitizer:
    max_depth = 8
    max_string_length = 2_000
    max_list_items = 100
    max_payload_bytes = 32_000

    def sanitize(self, payload: dict[str, Any] | None) -> dict[str, Any] | None:
        if payload is None:
            return None

        sanitized = self._sanitize_value(payload, depth=0)

        encoded = json.dumps(sanitized, ensure_ascii=False, default=str)
        if len(encoded.encode("utf-8")) > self.max_payload_bytes:
            return {
                "summary": "payload_too_large",
                "original_size_bytes": len(encoded.encode("utf-8")),
            }

        return sanitized

    def _sanitize_value(self, value: Any, depth: int) -> Any:
        if depth > self.max_depth:
            return "[redacted:max_depth]"

        if isinstance(value, dict):
            result = {}

            for key, item in value.items():
                key_str = str(key)

                if SECRET_KEY_RE.search(key_str):
                    result[key_str] = "[redacted:secret]"
                    continue

                result[key_str] = self._sanitize_value(item, depth + 1)

            return result

        if isinstance(value, list):
            items = value[: self.max_list_items]
            result = [self._sanitize_value(item, depth + 1) for item in items]

            if len(value) > self.max_list_items:
                result.append(f"[truncated:{len(value) - self.max_list_items}_items]")

            return result

        if isinstance(value, str):
            return self._sanitize_string(value)

        return value

    def _sanitize_string(self, value: str) -> str:
        if self._looks_like_base64(value):
            return "[redacted:base64]"

        if len(value) > self.max_string_length:
            return value[: self.max_string_length] + f"...[truncated:{len(value)} chars]"

        return value

    def _looks_like_base64(self, value: str) -> bool:
        compact = value.strip()

        if len(compact) < 256:
            return False

        if len(compact) % 4 != 0:
            return False

        return bool(BASE64_RE.fullmatch(compact))

Как выглядит путь одного запроса

Соберем все вместе на сценарии: пользователь просит, например, обработать файл и применить проектные настройки.

  • API принимает запрос и нормализует session_id. Если есть client_turn_id или idempotency_key, система проверяет, не запускали ли такой turn раньше.

  • AgentService создает AgentTurn и строит TurnPlan. Шаги плана попадают в AgentPlanItem.

  • ApprovalService оценивает риск. Если нужен confirm, в SessionContext сохраняется pending_approval, а в agent_events появляется approval_requested.

  • Пользователь подтверждает. Система создает ApprovalGrant с ограниченным scope и снимает pending_approval.

  • Executor запускает шаг. Для долгой операции создается Job или WorkbookJob, а turn получает события tool_started и job_queued.

  • Worker атомарно claim‑ит job, выставляет lease_owner и lease_expires_at, обновляет progress_seq и публикует progress.

  • UI читает live stream. Если вкладка закрылась, после reconnect он догоняет события из БД.

  • При успехе job получает completed и output_file_id, проект обновляет last_used_file_id, turn получает финальное событие.

  • При retryable‑ошибке job возвращается в queued с next_attempt_at. При terminal‑ошибке сохраняются error и классификация отказа.

В коде контракт между слоями можно выразить вот так

class AgentRequestHandler:
    def handle(self, request: AgentRequest) -> AgentTurn:
        session_id = normalize_session_id(request.session_id)

        with self.db.transaction():
            existing_turn = self.turns.find_by_idempotency_key(
                session_id=session_id,
                idempotency_key=request.idempotency_key or request.client_turn_id,
            )
            if existing_turn is not None:
                return existing_turn

            turn = self.agent_service.create_turn(
                session_id=session_id,
                project_id=request.project_id,
                user_input=request.input,
                idempotency_key=request.idempotency_key,
                client_turn_id=request.client_turn_id,
            )

            plan = self.agent_service.build_plan(turn)
            self.agent_service.save_plan_items(turn, plan)

            approval = self.approval_service.assess(turn, plan)

            if approval.required:
                self.sessions.set_pending_approval(
                    session_id=session_id,
                    approval=approval.to_pending_payload(),
                )
                self.events.write(
                    turn_id=turn.turn_id,
                    session_id=session_id,
                    type="approval_requested",
                    payload=approval.to_event_payload(),
                )
                return turn

            self.executor.enqueue_ready_steps(turn, plan)

            return turn

И важная часть дял воркера:

class JobWorker:
    def run_once(self) -> None:
        job = self.jobs.claim_next(
            lease_owner=self.worker_id,
            lease_seconds=60,
        )

        if job is None:
            return

        try:
            self.events.write(
                turn_id=job.turn_id,
                job_id=job.job_id,
                project_id=job.project_id,
                type="job_started",
                payload={"job_id": str(job.job_id), "type": job.type},
            )

            output = self.execute(job)

            self.jobs.complete(job.job_id, output=output)

            self.events.write(
                turn_id=job.turn_id,
                job_id=job.job_id,
                project_id=job.project_id,
                type="job_completed",
                payload=output,
            )

        except RetryableJobError as exc:
            self.jobs.schedule_retry(job.job_id, error=exc.to_payload())

        except TerminalJobError as exc:
            self.jobs.fail(job.job_id, error=exc.to_payload())

Телеграм канал автора, где он что‑то пишет про ML, NLP и разработку

Комментарии (0)