Многие внутри BitDive привыкли к Python: для анализа данных, прототипирования агентов и построения CI/CD‑утилит этот язык незаменим. Но когда нам потребовался единый масштабируемый MCP‑сервер (Message Control Plane) для обработки и маршрутизации телеметрии в реальном времени, мы решили попробовать нечто более декларативное и «из коробки» готовое к бою. Наш выбор — Spring Boot вместе с новым модулем Spring AI, который позволяет легко описывать инструменты (Tools) и управлять ими через единый SSE‑интерфейс.

1. Введение: почему Spring AI для MCP

1.1. Основные требования к MCP

  1. Высокая пропускная способность. Поток телеметрии достигает десятков тысяч событий в секунду.

  2. Гибкость маршрутизации. Одно событие должно одновременно попадать в ClickHouse, Kafka, Elasticsearch и другие хранилища.

  3. Простота расширения. Каждый новый инструмент (например, AI‑аннотация или трассировка) должен подключаться минимальным набором конфигурации.

  4. Надёжность и отказоустойчивость. Реактивная модель должна автоматически справляться с бэкпрешером и обеспечивать предсказуемые задержки.

1.2. Почему не Python или Go?

  • Python + asyncio. Нам пришлось бы тонко настраивать loop’ы, uvloop и C‑расширения для достижения нужной производительности. Код становился бы сложнее в сопровождении.

  • Go. Высокая производительность, но наша команда преимущественно на Java/Python. Обучение и поддержка Go‑экосистемы потребовали бы времени и ресурсов.

Spring Boot + Spring AI стали компромиссом: знакомый стек, минимальный «боилерплейт» и мощные декларативные возможности.

2. Простота разработки: от конфигурации до готового сервера

2.1. Шаг 1: Конфигурация в application.yml

Полный пример секции Spring AI для MCP‑сервера:

spring:
  ai:
    mcp:
      server:
        request-timeout: 180                # Таймаут AI‑запроса в секундах
        enabled: true                       # Включить инструменты Spring AI для MCP
        type: ASYNC                         # Режим работы: ASYNC или SYNC
        name: bitdive-mcp                   # Имя сервера в списке доступных инструментов
        version: 1.0.0                      # Версия сервера (может использоваться при развёртывании)
        instructions: |                     # Описание сервера (передаётся клиентам)
          Этот сервер предоставляет инструменты
          для работы с системой мониторинга BitDive.

        # Настройка SSE (Server‑Sent Events)
        sse-endpoint: /sse                  # Точка подписки на события
        sse-message-endpoint: /mcp/message  # Эндпоинт для приёма команд

# Пример настроек Netty для повышения производительности
server:
  port: 8080
  reactive:
    max-http-header-size: 16384
    idle-timeout: 120s
  netty:
    max-connections: 10000

Объяснение ключевых параметров:

  • type: ASYNC → клиент отправляет команду, а ответы инструментов приходят через SSE.

  • name и version → видны в списке сервисов у клиентов, облегчают роутинг и версионирование.

  • instructions → автоматически отображаются в помощи (help) у клиента.

2.2. Шаг 2: Подключаем зависимость

В pom.xml:

<dependency>
  <groupId>org.springframework.ai</groupId>
  <artifactId>spring-ai</artifactId>
  <version>1.0.0</version>
</dependency>

После этого Spring Boot автоматически активирует балансировку Reactor Netty, настройку SSE и всё, что нужно для работы Spring AI.

2.3. Шаг 3: Описание Tools (инструментов)

С помощью аннотаций @Tool и @ToolParam описываем любой сервисный слой как набор команд:

@Service
@RequiredArgsConstructor
@Slf4j
public class HeapMapTools {

    private final MonitoringHeapMapComponent heapMapComponent;
    private final ApiKeyComponent apiKeyComponent;

    @Tool(
      name = "getCurrentHeapMapAllSystem",
      description = "Возвращает метрики производительности для всех систем"
    )
    public List<Map<String, Object>> getCurrentHeapMap(
            @ToolParam(description = "API‑ключ для доступа") String apiKey
    ) {
        var user = apiKeyComponent.decryptApiKey(apiKey);
        return heapMapComponent.getCurrentHeapMap(user);
    }

    @Tool(
      name = "getCurrentHeapMapForModule",
      description = "Метрики по конкретному модулю"
    )
    public List<Map<String, Object>> getCurrentHeapMapForModule(
            @ToolParam(description = "Имя модуля") String module,
            @ToolParam(description = "API‑ключ для доступа") String apiKey
    ) {
        var user = apiKeyComponent.decryptApiKey(apiKey);
        return heapMapComponent.getCurrentHeapMap(module, user);
    }

    // Аналогично: для сервиса и для класса...
}

Почему так удобно? Вы описываете чисто бизнес‑логику, интерфейс автоматически становится доступен через SSE — без ручной регистрации или сложных конфигов.

2.4. Шаг 4: Регистрируем коллбэки инструментов

В конфигурационном классе объединяем все Beans с инструментами:

@Configuration
public class ToolCallbackConfig {
    @Bean
    public ToolCallbackProvider monitoringTools(
            TraceTools traceTools,
            HeapMapTools heapMapTools,
            LastCallTools lastCallTools
    ) {
        return MethodToolCallbackProvider.builder()
                .toolObjects(
                    traceTools,
                    heapMapTools,
                    lastCallTools
                )
                .build();
    }
}

После старта приложения под капотом Spring AI:

  • Сканирует все методы с @Tool и создаёт мэппинг

  • Запускает Netty‑сервер, слушающий /sse

  • На /mcp/message принимает команды и вызывает соответствующий метод

3. Подробности работы и решение проблем

3.1. Реактивный поток и backpressure

Spring WebFlux + Reactor Netty автоматически обрабатывает backpressure. Если клиент присылает события слишком быстро, Netty регулирует скорость чтения, а Reactor вовсе не выделяет лишние буферы.

Пример приёма и параллельной маршрутизации:

public Mono<Void> ingest(ServerRequest req) {
    return req.bodyToFlux(TelemetryEvent.class)
        .flatMap(event -> Flux.merge(
            writeToClickHouse(event),
            sendToKafka(event),
            enrichWithAI(event)
        ))
        .then();
}

Каждый flatMap — это отдельный асинхронный поток, а Flux.merge гарантирует независимость каналов.

3.2. Масштабирование и отказоустойчивость

  • Вертикальное масштабирование: настройка Netty позволяет увеличить число соединений до десятков тысяч.

  • Горизонтальное: несколько инстансов под балансировщиком Kubernetes принимают на себя часть пула SSE‑клиентов.

3.3. Расширение через AI‑инструменты

Допустим, нужно добавить команду, которая на основе события даёт рекомендацию по оптимизации GC-параметров через LLM:

@Tool(
  name = "recommendGC",
  description = "Рекомендации по GC конфигурации"
)
public String recommendGC(
        @ToolParam("Текущие метрики heap") String heapMetrics,
        @ToolParam("API‑ключ") String apiKey
) {
    var user = apiKeyComponent.decryptApiKey(apiKey);
    String prompt = "Дай советы по GC конфигурации на основе: " + heapMetrics;
    return aiClient.generateText(prompt).getText();
}

Без изменения сети или инфраструктуры сервер сразу начнёт обрабатывать эту новую команду.

3.5. Проблема разрыва соединений и решение heartbeat

При использовании SSE-клиенты иногда теряли соединение: в Java SDK для MCP отсутствовала поддержка механизма ping–pong, из-за чего промежуточные балансировщики (NGINX, облачные LB) закрывали «холостые» соединения после таймаута.

Чтобы поддерживать активность сессий, мы добавили в провайдер SSE-соединений метод sendHeartbeat(), который посылает «ping» всем подключённым клиентам без ожидания ответа:

/**
 * Sends a heartbeat (ping) to all connected clients to keep connections alive. This
 * method sends ping notifications to all active sessions without expecting a
 * response, which helps prevent connection timeouts.
 * @return A Mono that completes when heartbeat has been sent to all sessions
 */
public Mono<Void> sendHeartbeat() {
    if (sessions.isEmpty()) {
        logger.debug("No active sessions to send heartbeat to");
        return Mono.empty();
    }

    logger.debug("Sending heartbeat to {} active sessions", sessions.size());

    return Flux.fromIterable(sessions.values())
        .flatMap(session -> session.sendNotification(McpSchema.METHOD_PING, null)
            .doOnSuccess(v -> logger.trace("Heartbeat sent successfully to session {}", session.getId()))
            .doOnError(e -> logger.warn("Heartbeat failed for session {}: {}", session.getId(), e.getMessage()))
            .onErrorComplete()) // Continue with other sessions even if one fails
        .then();
}

И регистрируем периодическую отправку:

@Bean
public Disposable heartbeatLoop(WebFluxSseServerTransportProvider provider) {
    return Flux.interval(Duration.ofSeconds(40))
            .flatMap(t -> provider.sendHeartbeat())
            .subscribe();
}

Итоги

  1. Декларативность. Никаких ручных WebClient, JSON‑парсинга и сложных конфигураций.

  2. Реактивность. Автоматический backpressure, независимые потоки, высокая пропускная способность.

  3. Расширяемость. Новые команды подключаются одной аннотацией.

  4. Интеграция AI. Spring AI из коробки работает с LLM — внутренняя логика клиента спрятана за абстракцией.

Наш MCP‑сервер на Spring AI стал ядром системы маршрутизации и инструментов, заменив десятки скриптов и микросервисов. Теперь телеметрия обрабатывается быстро, надёжно и просто расширяется — кроме Python, мы успешно дополнили стек сильным Java‑решением.

Комментарии (4)


  1. Irina76
    29.06.2025 15:54

    очень полезно , спасибо большое


  1. Faragon Автор
    29.06.2025 15:54

    мы старались)


  1. Gabenskiy
    29.06.2025 15:54

    и почему мне кажется, что этот текст сгенерила нейронка?


    1. Faragon Автор
      29.06.2025 15:54

      почему вам так кажется ?