
TLDR: Агенты все чаще используются с минимальным участием человека в управлении. Однако автономные агенты представляют собой типичные распределённые системы, в которых возможны многочисленные сложные сценарии сбоев. Мы выступаем за отказоустойчивый и функциональный подход к проектированию таких систем с целью обеспечения их надёжности, черпая вдохновение в языке Erlang. Мы внедрили этот подход в новую версию нашей библиотеки с открытым исходным кодом для оркестрации агентов под названием ramure, что упрощает написание надёжного программного обеспечения для агентов.
Раньше использование агентов в основном сводилось к взаимодействию «пользователь-агент», при котором пользователь постоянно управлял действиями модели и обеспечивал их обоснованность. Однако сейчас модели стали настолько умными, что имеет смысл запускать агентов в полностью автономном режиме, а людям оставлять обратную связь только на более высоких уровнях.
Однако обеспечение надёжной работы агентов без постоянного контроля со стороны человека представляет собой как инженерную задачу (т. е. обеспечение надёжной работы каждого компонента этой инфраструктуры), так и сложность проектирования (т. е. разработка процессов, эффективно использующих ресурсы агентов). Для этого потребуется более целенаправленное проектирование системы, уделяющее внимание моментам передачи работы между агентами и вмешательствам человека. Кроме того, такие системы должны обладать способностью к самокоррекции.
Развертывание агентов представляет собой сложную задачу из области распределенных систем
Совокупность агентов, координирующих взаимодействия в разных средах — это сложная распределенную систему. Заставить распределенные системы работать эффективно крайне сложно.
Агенты не являются исключением, и в ходе работы мы заметили такие проблемы, как:
ошибки синхронизации, связанные со временем запуска агентов, получением ими определённых событий, а также с переходом из режима координации в режим бездействия.
ситуации, когда один из агентов в системе придерживается менее строгих критериев успешности, что в итоге ухудшает работу всей системы, например, ослабляет требования к тестированию.
отсутствие четкого разделения ролей и обязанностей агентов, в результате чего они мешают друг другу и замедляют работу системы или используют возможности, которые им не положены.
Многие из подобных проблем имеют аналоги при программировании обычного софта, но дополнительная сложность, связанная с агентами, заключается в том, что от них ожидается выполнение гораздо более сложных задач. Необходимо отслеживать сбои не только на программном уровне, но и на семантическом, то есть следить за тем, выполняют ли эти распределенные компоненты свои функции так, как и предполагалось. Сложность заключается в том, что эти действия — поддержание свойств системы, создание функций, подготовка отчетов и т. д. — нелегко проверить с помощью одного лишь кода.
К счастью, мы обнаружили, что концепции из области распределенного программного обеспечения в сочетании со способностью агентов проверять всё более неоднозначные вещи позволяют нам добиться значительного прогресса в надежной и эффективной работе множества агентов.
Отступление об Erlang
Erlang — это язык для функционального программирования с поддержкой параллелизма, созданный для телекоммуникационных приложений; его архитектура оптимизирована для высокой степени параллелизма и высокой надёжности. Как известно, WhatsApp и Discord работали на инфраструктуре Erlang.
Erlang построен на принципах параллелизма — программы на Erlang работают путём определения и запуска процессов, которые выполняются одновременно. В Erlang:
Запуск и остановка процессов обходятся очень дешево.
Процессы могут взаимодействовать друг с другом и работать параллельно для выполнения задач.
Процессы могут контролироваться другими процессами.
Процессы легко отлаживать и запускать заново.
Такая архитектура способствует параллелизму, модульности и встроенной отказоустойчивости, при которой каждый вычислительный процесс должен достаточно чётко сигнализировать о потенциальном сбое, чтобы другие процессы могли его исправить.
Агенты должны работать на основе отказоустойчивых перезапускаемых процессов
Мы можем ввести аналогичное понятие «агентного процесса» (АП) — совокупности агентов, работающих и взаимодействующих на машинах и имеющих общий жизненный цикл. При разработке качественного программного обеспечения на основе агентов мы сочли полезными следующие особенности:
Запуск и остановка АП обходятся очень дешево.
АП могут взаимодействовать друг с другом и работать параллельно для выполнения задач.
АП могут контролироваться другими АП.
АП легко отлаживать и запускать заново.
Когда мы хотим разрабатывать реальные системы, в которых используются агенты, а не просто обеспечивать их взаимодействие и работу в неструктурированном режиме, нам необходимо учитывать назначение различных компонентов, что означает их успешная работа и как система может восстановить работоспособность в случае их сбоя.
Чтобы пояснить это на конкретном примере: предположим, мы хотим создать пул рабочих процессов АП. Пул рабочих процессов представляет собой набор рабочих агентов, каждый из которых выполняет одну задачу. Пул предоставляет методы для запуска и завершения работы агентов, а также для проверки состояния их работы. Каждому рабочему процессу предоставляются определенные инструменты для отправки задач, а другой агент или программа может добавлять задачи (add_tasks) и проверять состояние пула рабочих процессов.
При разработке этого АП мы должны учитывать все возможные причины сбоев:
базовые сбои программного обеспечения — обрыв соединений с агентами, отключение машин и т. п.
таймауты, когда агенты не переходят к следующему шагу, например, когда рабочий процесс не отправляет данные; наш пул рабочих процессов должен упрощать мониторинг таких ситуаций и генерацию соответствующих событий.is.
один из приёмов, который мы могли бы использовать, — это предоставить рабочим процессам также событие «сбой», чтобы побудить их сигнализировать о завершении даже в случае сбоев
возможно, мы захотим ввести ограничение на использование АП, например, установить предельное количество задач, при превышении которого управление становится затруднительным; поэтому нам нужно генерировать и кодировать такого рода ограничения
Кaждая из этих ошибок должна быть прозрачной для программы, вызывающей данный пул рабочих процессов, и на основе этой информации новые попытки должны быть легко воспроизводимы. Например, если агент терпит сбой — либо вызывая функцию fail с указанием причины, либо по причине таймаута, — мы хотим, чтобы были доступны данные о сбое, а также исходная спецификация и состояние машины на момент сбоя, чтобы мы могли легко запустить рабочие процессы для выполнения этой задачи заново.
Тщательное продумывание этих интерфейсов обработки ошибок позволяет нам быть уверенными в том, что каждый компонент выполнит свою задачу. Такая надёжность имеет решающее значение при проектировании систем, состоящих из множества компонентов, каждый из которых может выйти из строя независимо от других.
ramure - открытая среда выполнения для агентов
ramure - это новая среда выполнения агентных систем, напрямую воплощающая описанные выше идеи. это библиотека, упрощающая определение и развертывание потоков агентов в любых программных средах. ramure предоставляет примитивы, которые позволяют создавать более надёжное программное обеспечение на основе агентов.
Центральным объектом ramure является agent_process (АП), который определяется с помощью декоратора @agent_process. Внутри этой функции можно определять агенты и машины, а также способы их взаимодействия.
Когда происходит вызов декорированного АП, инициализируется фоновая среда выполнения, которая в дальнейшем отвечает за жизненный цикл агентов и машин. Для управления жизненным циклом АП можно определять события с помощью @agent.on , которые агенты могут вызывать в вашем коде.
Вот пример программы с одним рабочим процессом:
import asyncio from ramure import agent, agent_process, done, fail, wait @agent_process async def start_worker(task_id: str, spec: str) -> str: # инициализируем агента (локально или в удалённой изолированной среде) worker = await agent(f"worker-{task_id}") # регистрируем инструменты, которые агент может вызывать через управляющую среду @worker.on("finish") async def on_finish(summary: str) -> str: """ Вызовите этот инструмент с результатом, когда задача будет завершена.""" done(summary) return "Recorded." @worker.on("give_up") async def on_give_up(reason: str) -> str: """ Вызовите этот инструмент, если не можете выполнить задачу.""" fail(f"gave_up: {reason}") return "Recorded." await worker.send( f"Task {task_id}:\n\n{spec}\n\n" "When done, call finish(summary). " "If impossible, call give_up(reason)." ) # ожидаем завершения жизненного цикла через события done/fail от агента return await wait() if __name__ == "__main__": spec = "Check for bugs introduced in PR #17." print(asyncio.run(start_worker("t0", spec)))
Здесь наш АП представляет собой простой агент-исполнитель задач, имеющий событие завершения и возможность прервать работу. Структурирование потока информации в программе упрощает надежное использование ресурсов агентов, особенно в более сложных случаях. Вы также можете настроить, из какого image будет запускаться агент - с вашего компьютера, из образа Docker или из образа удаленной песочницы. Агент запускается через обвязку pi, и его работу можно наблюдать в tmux.
АП компонуются. АП может вызывать другой АП так же, как вы вызываете любую асинхронную функцию, чтобы получить его конечный результат, или вы можете получить дескриптор к его потоку асинхронных событий. Этот дескриптор возвращается функцией spawn(). Он хранит все события дочернего АП и может реагировать на его поведение во время работы — повторять попытку в случае сбоя или взаимодействовать с ним. Это позволяет нам иметь различные функциональные компоненты, которые мы можем объединять в рамках выполнения агента.
АП также могут определять конкретные способы взаимодействия с ними, предоставляя API, который можно вызвать в коде или через другой агент. Для этого мы используем декоратор expose, например, здесь, чтобы создать пул рабочих процессов, в который можно добавлять задачи:
@agent_process(image=LocalImage()) # run on local async def worker_pool() -> None: specs: dict[str, str] = {} # @expose позволяет родительскому процессу, который запускает этот пул рабочих процесов, # впоследствии вызывать данную функцию @expose async def add_task(spec: str) -> str: tid = f"t{len(specs):04d}" specs[tid] = spec emit("task_added", {"task_id": tid, "spec": spec}) # bubble перенаправляет события дочернего процесса родительскому процессу, # помечая их полем `source`, чтобы вызывающая сторона могла различать источники событий bubble(spawn(start_worker, tid, spec), source=tid) return tid @expose async def tasks() -> dict[str, str]: return dict(specs) await wait()
Затем можно использовать доступный пул рабочих процессов различными способами:
pool = spawn(worker_pool) # прямой вызов функции await pool.call( "add_task", spec="Check for bugs introduced in PR #17.", ) # либо подключаем агента — все функции, помеченные как @expose, становятся доступными ему как инструменты monitor = await agent( "monitor", system_prompt="You run a pool of workers.", ) # теперь агент может вызывать предоставленный интерфейс await pool.attach(monitor, prefix="pool_")
Итак, ramure — пример инструмента, который предоставляет простые примитивы для описания, развертывания и мониторинга экземпляров надежного агентского программного обеспечения. Это небольшой и простой проект — менее 3k строк кода. Попробуйте выполнить с его помощью команды uv tool install ramure или pip install ramure.
d3d14
Правильно я понимаю: 1 агент = 1 процесс? А если агентов будут сотни?