JDK Flight Recorder (JFR) — это диагностическая подсистема, встроенная в JVM. В основе JFR лежит очень простая идея, но вокруг нее выросла разнообразная экосистема решений, позволяющих решать широкий спектр задач.
В данной статье я хочу сфокусироваться на одном аспекте технологии JFR — потоковой обработке событий. Потоковая обработка появилась в JDK 14 в виде Flight Recorder Event Streaming API и позволяет прикладному коду обрабатывать события JFR с минимальной задержкой. Далее в статье я буду писать Streaming API для краткости.


Что такое событие JFR?


Чтобы объяснить возможности Streaming API, нужно немного рассказать о работе JFR.


Событие — это основное понятие в мире JFR. Есть источники событий (зонды) — часть из них встроена непосредственно в JVM и их более сотни, источником событий может выступать и прикладной код публикующий события через соответствующий API.
Событие имеет тип, который определяет его схему, и может включать разнообразные данные.


JFR реализует единую точку управления конфигурацией всех источников-зондов, позволяет включать нужную комбинацию (профиль) и передавать параметры зонду при необходимости (например частоту сбора сэмплов для событий сэмплирования методов).


JFR отвечает за накопление и буферизацию событий. Те, кто пробовал реализовывать общие очереди в многопоточных приложениях, знают, что это таит в себе ряд подводных камней. JFR использует продвинутые трюки, в том числе локальные буфера потоков, чтобы минимизировать накладные расходы, связанные с накоплением событий. Также JFR обеспечивает политики ротации и вытеснения старых событий на диск и последующее их удаление.
Доступ к накопленным событиям возможен различными способами (API, jcmd, JMX). JFR имеет открытый бинарный формат записи событий, который поддерживается различными инструментами.


Зачем нужен Streaming API?


Изначально модель работы с JFR подразумевала ретроспективный анализ выгруженного массива событий.
Типичная сессия профилирования могла выглядеть так:


  • Старт сессий JFR;
  • Ожидание накопления событий;
  • Выгрузка массива событий JFR в файл;
  • Остановка сессии (события перестают публиковаться);
  • Работа с данными файла в Mission Control или другом инструменте.

При использовании непрерывного профилирования сессия стартует при запуске JVM и не останавливается. Но для анализа всё равно надо выгрузить данные в файл.


С развитием технологий мониторинга, однако, растёт потребность в получении диагностических событий в реальном времени. JVM предоставляет возможность мониторинга многих параметров системы через интерфейс JMX, но JFR события дают большую глубину и детализацию.


Появления Streaming API открыло возможности использования JFR для мониторинга в реальном времени.


Возможности Streaming API


Streaming API даёт возможность обрабатывать события JFR с минимальной задержкой без необходимости лишних манипуляций с файлами.
Есть два варианта подписки на события — пассивный и активный.
В пассивном варианте подписка делается без запуска сессии JFR, сбор событий в этом случае должен быть запущен другими методами.


Пример кода ниже позволяет начать слушать события типа "jdk.JavaMonitorEnter" (событие захвата монитора), публикуемые в локальном процессе, и печатать их в консоль.


    try (EventStream stream = EventStream.openRepository()) {
       stream.onEvent("jdk.JavaMonitorEnter", System.out::println);
       stream.start();
    }

Такую же подписку можно создать и передав в качестве параметра путь к дисковому буферу, используемому JFR.


    Path path = Path.of("/tmp/myjfr/");
    try (EventStream stream = EventStream.openRepository(path)) {
       stream.onEvent("jdk.JavaMonitorEnter", System.out::println);
       stream.start();
    }

Второй вариант может быть использован для подписки на события другого процесса в рамках одного хоста. Этот сценарий я подробнее разберу ниже на примере Kubernetes.


Активный режим позволяет управлять поведением зондов в рамках подписки. В активном режиме нет необходимости стартовать сессию JFR тем или иным способом, она привязана к подписке.


    try (RecordingStream stream = new RecordingStream()) {
       stream.enable("jdk.JavaMonitorEnter").withStackTrace();
       stream.onEvent("jdk.JavaMonitorEnter", System.out::println);
       stream.start();
    }

В примере выше, используя объект RecordingStream, мы можем индивидуально включать интересующие нас события. Такой вариант более удобен для реализации встраиваемых агентов, экспортирующих события JFR в другие системы работы с диагностическими данными.


Возможно также использование RecordingStream и при удалённом подключении к JVM, но об этом ниже.


Конвертируем JFR события в метрики micrometer, используя Streaming API


Возможно у читателя возник вопрос — а зачем всё это нужно в моём Spring приложении? Попробую ответить на этот вопрос простым примером.


В JDK 21 появились виртуальные потоки (точнее JDK 21 — первая LTS версия, в которой они доступны). Один из нюансов виртуальных потоков (решенный в JDK 24) — это блокировка потока-носителя, если виртуальный поток владеет Java монитором (находится в synchronized блоке). Эта особенность может приводить к проблемам с исчерпанием пула потоков-носителей.
В JFR доступно событие, связанное с подобной блокировкой потока-носителя. А наше приложение уже имеет интеграцию с системой мониторинга, реализованную через инфраструктуру метрик в Spring.


Наша задача подписаться на JFR события и публиковать их в реестр метрик Spring.


Контролер отвечает за запуск подписки


    @Component
    class JfrEventLifecycle implements SmartLifecycle {
       private final AtomicBoolean running = new AtomicBoolean(false);
       private final JfrVirtualThreadPinnedEventHandler virtualThreadPinnedEventHandler;
       private RecordingStream recordingStream;
       JfrEventLifecycle(JfrVirtualThreadPinnedEventHandler virtualThreadPinnedEventHandler) {
           this.virtualThreadPinnedEventHandler = virtualThreadPinnedEventHandler;
       }
       @Override
       public void start() {
           if (!isRunning()) {
               recordingStream = new RecordingStream();
               recordingStream.enable("jdk.VirtualThreadPinned").withStackTrace();
               recordingStream.onEvent("jdk.VirtualThreadPinned", virtualThreadPinnedEventHandler::handle);
               // prevents memory leaks in long-running apps
               recordingStream.setMaxAge(Duration.ofSeconds(10));
               recordingStream.startAsync();
               running.set(true);
           }
       }
       @Override
       public void stop() {
           if (isRunning()) {
               recordingStream.close();
               running.set(false);
           }
       }
       @Override
       public boolean isRunning() {
           return running.get();
       }
    }

Обработчик пропускает через себя события и обновляет метрику


    @Component
    class JfrVirtualThreadPinnedEventHandler {
       private static final int STACK_TRACE_MAX_DEPTH = 25;
       private final Logger log = LoggerFactory.getLogger(JfrVirtualThreadPinnedEventHandler.class);
       private final MeterRegistry meterRegistry;
       JfrVirtualThreadPinnedEventHandler(MeterRegistry meterRegistry) {
           this.meterRegistry = meterRegistry;
       }
       void handle(RecordedEvent event) {
           // marked as nullable in Javadoc
           var thread = event.getThread() != null ? event.getThread().getJavaName() : "<unknown>";
           var duration = event.getDuration();
           var startTime = LocalDateTime.ofInstant(event.getStartTime(), ZoneId.systemDefault());
           var stackTrace = formatStackTrace(event.getStackTrace(), STACK_TRACE_MAX_DEPTH);
           log.warn(
                   "Thread '{}' pinned for: {}ms at {}, stacktrace: \n{}",
                   thread,
                   duration.toMillis(),
                   startTime,
                   stackTrace
           );
           var timer = meterRegistry.timer("jfr.thread.pinning");
           timer.record(duration);
       }
        private String formatStackTrace(RecordedStackTrace stackTrace, int maxDepth) {
           if (stackTrace == null) {
               return "\t<not available>";
           }
           String formatted = "\t" + stackTrace.getFrames().stream()
                   .limit(maxDepth)
                   .map(JfrVirtualThreadPinnedEventHandler::formatStackTraceFrame)
                   .collect(Collectors.joining("\n\t"));
           if (maxDepth < stackTrace.getFrames().size()) {
               return formatted + "\n\t(...)"; // truncated
           }
           return formatted;
       }
       private static String formatStackTraceFrame(RecordedFrame frame) {
           return frame.getMethod().getType().getName() + "#" + frame.getMethod().getName() + ": " + frame.getLineNumber();
       }
    }

Полный код проекта доступен по ссылке — https://github.com/mikemybytes/jfr-thread-pinning-spring-boot


Таким образом, нам удалось "подцепить" метрику и JFR и опубликовать её через реестр метрик Spring. Если у нас уже настроен стэк мониторинга, например, Prometheus + Grafana, то добавить чарт времени блокировки виртуальных потоков не составит труда.


Я думаю, что подобный сценарий будет наиболее популярным способом использования Streaming API в прикладном коде.
Разумеется, в идеальном мире нам было бы достаточно подключить какой-нибудь "jfr-starter" в проект и получить все интересные метрики автоматически. Но увы, такой модуль ещё никем не написан. Streaming API позволяет вам пойти и решить подобную проблему здесь и сейчас. И кто знает, может, читатель этой статьи и окажется автором того самого модуля "jfr-stater".


Как ещё можно использовать Streaming API?


Считать метрики на основе JFR событий — наиболее очевидное использование Streaming API. Но мне захотелось упомянуть ещё одно, не совсем очевидное, применение для данной технологии — автоматизация тестов на нефункциональные требования.


jfrunit — библиотека, позволяющая делать проверки с использованием событий JFR в JUnit тестах.


Кто-то может справедливо заметить, что JUnit не совсем предназначен для тестов на нефункциональные требования, и это правда …
Но иногда возникают ситуации, когда такие тесты нужны, и если можно использовать JUnit, то почему бы и нет.
Пример такой ситуации — тест на корректное освобождение ресурсов при отмене выполнения сложной асинхронной задачи. Ниже приведён пример теста на jfrunit.


    @JfrEventTest
    public class JfrUnitTest {

        public JfrEvents jfrEvents = new JfrEvents();

    private JfrEventType eVirtualThreadStart = new JfrEventType("jdk.VirtualThreadStart") {};
    private JfrEventType eVirtualThreadEnd = new JfrEventType("jdk.VirtualThreadEnd") {};

    @EnableEvent("jdk.VirtualThreadStart")
    @EnableEvent("jdk.VirtualThreadEnd")
    @Test
    public void test_no_thread_leak() throws InterruptedException {
        // задача создаёт несколько виртуальных потоков для работы
        CompletableFuture<Object> heavyTask = startHeavyTask();
        Thread.sleep(10);

        // отмена задачи должна завершать выполнение всех подзадач
        heavyTask.cancel(true);

        Assertions.assertThatThrownBy(() -> heavyTask.join())
        .isExactlyInstanceOf(CancellationException.class);

        jfrEvents.awaitEvents();

        var started = jfrEvents.filter(eVirtualThreadStart).count();
        var ended = jfrEvents.filter(eVirtualThreadEnd).count();

        // проверяем, что все созданные виртуальные потоки завершены
        Assertions.assertThat(ended)
            .as("VirtualThreadEnd events %d should be equal to VirtualThreadStart events %d", ended, started)
            .isEqualTo(started);
    }

     …

Авторы проекта jfrunit в качестве примера проводят тест, в котором проверяется число SQL запросов при выполнении кода.


    @SpringBootTest
    @JfrEventTest
    public class SpringJooqGradleApplicationTests {
       @Autowired
       public TestUserService testUserService;
       public JfrEvents jfrEvents = new JfrEvents();
       @Test
       public void createUser() {
           boolean success = testUserService.createUser(String.valueOf(ThreadLocalRandom.current().nextLong()),
               ThreadLocalRandom.current().nextInt());
           Assertions.assertThat(success).isTrue();
           jfrEvents.awaitEvents();
           Assertions.assertThat(jfrEvents.events().filter(this::isQueryEvent).count()).isEqualTo(1);
       }
    }

Полный код проекта доступен по ссылке — https://github.com/moditect/jfrunit-examples/tree/main/spring-jooq-gradle


Это пример интересен тем, что использует другой проект JMC Agent для создания, c использованием инструментации, событий JFR связанных с работой JOOQ библиотеки.


Удалённый доступ к Streaming API


Доступ к Streaming API возможен не только в рамках одного JVM процесса.
Традиционно для удалённого доступа к диагностическим инструментам JVM используется протокол JMX. Он выключен по умолчанию и должен быть явным образом настроен для использования.
Пример кода ниже позволяет создать объект RecordingStream для подписки на события удалённой JVM доступной по JMX.


    String host = "com.example";
    int port = 7091;
    String url = "service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi";
    JMXServiceURL u = new JMXServiceURL(url);
    JMXConnector c = JMXConnectorFactory.connect(u);
    MBeanServerConnection connection = c.getMBeanServerConnection();
    try (RemoteRecordingStream stream = new RemoteRecordingStream(connection)) {
       stream.enabled("jdk.JavaMonitorEnter").withStackTrace();
       stream.onEvent("jdk.JavaMonitorEnter", System.out::println),
       stream.start();
    }

Под капотом RemoteRecordingStream использует poll стратегию, и задержка получения событий может составлять несколько секунд.


Другой способ удалённой подписки на события JFR использует совместный доступ к файловой системе.


Давайте рассмотрим данный сценарий на примере запуска агента мониторинга в качестве контейнера в Kubernetes в соответствии с паттерном "side car".


У нас есть POD, который содержит контейнер приложения. Мы хотим запустить агент мониторинга как дополнительный контейнер (это, например, важно, если мы не хотим вносить изменения в образ приложения). Для обмена данных, обоим контейнерам потребуется общий доступ к файловой системе, который можно обеспечить, создав дисковый том (volume) типа "dir" и примонтировав его к обоим контейнерам.



В приложении для запуска приложения используем дополнительные параметры командной строки


java \ 
-XX:StartFlightRecording \ -XX:FlightRecorderOptions:repository=/var/jfr/repo \
…

Параметры выше запускают JFR с настройками по умолчанию, дополнительно прописываем путь на диске для “репозитория” — директории, которую JFR будет использовать для буферизации событий.
В нашем случае путь /var/jfr/repo должен вести на дисковый том доступный обоим контейнерам, как было описано выше.
Теперь на агенте мы можем использовать следующий код для подписки на события.


    Path path = Path.of("/var/jfr/repo/");
    try (EventStream stream = EventStream.openRepository(path)) {
       stream.onEvent(System.out::println);
       stream.start();
    }

Код выше — просто демонстрация подписки. В реальности код агента должен каким-то образом обрабатывать события и публиковать метрики в вашу инфраструктуру мониторинга.
Если сравнивать с JMX, подписка через файловую систему проще в настройке и менее уязвима с точки зрения безопасности. Влияние на процесс приложения также меньше в этом случае.


Заключение


Появление Streaming API для подписки на события JFR в JDK 14 существенно расширило возможности его использования для мониторинга в реальном времени.
Несмотря на то, что аудитория Streaming API — это прежде всего авторы мониторинговых агентов и инструментов диагностики, он без труда может быть использован в приложении. Иногда нужно решить частную проблему мониторинга “здесь и сейчас”, не дожидаясь, пока нужные вам метрики появятся в продуктах/фреймворках, которые вы используете. В этом случае воспользоваться Streaming API и реализовать метрику в своём коде — вполне оправданно.
Также jfrunit демонстрирует нам креативное использование для Streaming API, несвязанное с мониторингом.


В целом, можно сказать, что экосистема, возникшая вокруг JFR, пока не поспевает за его развитием, и я ожидаю появление новых интересных решений, использующих API JFR, включая Streaming API, в ближайшем будущем.

Комментарии (0)