Привет, Хабр! Я Евгений Шурупов, техлид бекенда в ОТП Банке в команде дебетового карточного конвейера. В этой статье я расскажу, как мы улучшали логи внедрением сквозных идентификаторов и не только. Делали это, чтобы упростить и ускорить разбор возникающих ошибок. В результате это в разы сократило время разбора, и в то же время упростило жизнь разработчикам при написании кода, поскольку теперь не нужно каждый раз думать о логах. Нужный код написан единожды (или почти) и добавлен в код каждого сервиса через зависимость.
Статья может быть полезна backend-разработчикам, лидам, тестировщикам, аналитикам, которые проектируют и разрабатывают сложные системы. Код написан на java и соседних технологиях, но подходы, я уверен, универсальны. Они упростят жизнь при разборе ошибок на проде и на тестовых стендах.
С чего всё начиналось
Наш конвейер занимается выпуском дебетовых карт в разных каналах: ДБО (мобильные приложения и личный кабинет в браузере), неавторизованная зона сайта, отделение банка и т.д.
Путь по процессу выпуска карт зависит от разных вводных, т.е. может протекать по разному. Это зависит как от канала, так и от типа карты (цифровая, именная, мгновенная), выбранного продукта и много чего ещё.
По пути конвейер обращается в разные внешние системы банка. Может ожидать ввода данных пользователя, например менеджера в отделении, может ожидать доставки карты клиенту, или подписания документов, как через смс или приложение, так и бумажное в отделении. Чтобы система, стартовавшая процесс, обеспечивала своевременное взаимодействие с клиентом, конвейер отправляет коллбэки с актуальным статусом и необходимой информацией.
На пути карты к клиенту могут случаться ошибки. Причиной могут быть, как проблемы во внешних системах, временные и не очень, так и проблемы в данных, а также баги самого конвейера.
Сервисы конвейера работают на стеке Java 17, spring-boot. Оркестрируется процесс сервисом на процессном движке camunda 7 поверх spring-boot.
В нашей инфраструктуре логи у пода кубернетиса забирает filebeat, распарсивает json и кладёт их в elasticsearch. И уже мы через UI Opensearch смотрим логи.
Мы логи пишем в JSON, удобный для elastic, с помощью logstash-logback-encoder. Для этого добавлен следующий код.
Строка в build.gradle
implementation "net.logstash.logback:logstash-logback-encoder:X.X"
Файл logback.xml в папке main/resources
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="DEBUG">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
В некоторых уникальных случаях мы в коде сами выводим логи, но всё-таки в первую очередь логировать надо взаимодействия между сервисами, а также взаимодействия с внешними системами.
Для REST-запросов между сервисами мы используем довольно стандартную либу в spring стеке open-feign. Open-feign уже предоставляет средства логирования, которые можно даже настраивать.
Проблема 1. Недостаточные и избыточные логи open-feign
Логи запроса и ответа стандартные для open-feign в минимальном виде выглядит примерно вот так:

Для минимального лога мы видим в логе только URL запроса, метод и статус ответа. Для разбора будущей ошибки на проде этого будет недостаточно. Ошибка может быть из-за отправленных неверно данных, не в том формате. Также в ответе вполне может быть информация, которая поможет найти причину, если просто не укажет её.
Вывод в лог запроса при настроенном максимально подробном логировании, выглядит примерно так:

Это только то, что уместилось в экран. Несколько строк не уместилось.
Здесь вроде бы исчерпывающие данные о запросе и ответе. Однако, здесь есть другая проблема. Данных слишком много. Имя java-класса и его метода в каждом сообщении явно избыточно. С ходу сложно разобраться и найти то, что нужно. Разные данные одного запроса раскидываются на разные сообщения логов.
Дальше больше. Во-первых, сообщения о запросе не свернуть. Если мы знаем, что после найденного запроса идут ещё два, а нам нужен третий, то придётся листать несколько экранов, вместо того, чтобы увидеть все четыре запроса на одном экране. Если нам нужно просто просмотреть, какие были запросы, и статусы их ответов, мы не сможем это сделать на одном экране.
К этому добавляется проблема, что логируются только исходящие запросы. Open-feign это либа для исполнения исходящих http-запросов, поэтому входящие она не логирует. В каждом контроллере писать однообразный код, который будет логировать входящий запрос плохая практика. Возможно, в стеке спринга есть хорошее решение, отличающееся от того, что будет дальше, но я о нём не знаю.
То есть, если причиной ошибки был некорректный входящий запрос от вызывающей внешней системы, мы из логов это узнать не сможем.
Решение
Здесь на помощь к нам приходит классная библиотека Logbook.
Чтобы её подключить надо добавить нескольких строк в файл build.gradle
:
implementation "org.zalando:logbook-spring-boot-starter:X.X.X"
implementation "org.zalando:logbook-logstash:X.X.X"
implementation 'org.zalando:logbook-openfeign:X.X.X'
и одну строку в application.yml
logging:
level:
org.zalando.logbook.Logbook: TRACE
Также нужно объявить бин в контексте спринга.
@Configuration
public class CommonStarterAutoConfiguration {
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public Logbook logbook() {
HttpLogFormatter formatter = new JsonHttpLogFormatter();
LogstashLogbackSink logstashsink = new LogstashLogbackSink(formatter);
return Logbook.builder()
.sink(logstashsink)
.build();
}
}
Объявление бина мы кладём в отдельный спринговый стартер. И добавляем зависимость от него в каждый сервис конвейера.
Теперь запросы, как входящие, так и исходящие логируются. В списке сообщений лога теперь есть только два сообщения: с данными о запросе и ответе. В свёрнутом сообщении видна основная информация о запросе и ответе: метод, урл, статус ответа. Но если развернуть сообщение, то мы увидим, всю информацию, включая тело и заголовки. До кучи мы увидим время выполнения запроса и id запроса, чтобы имея запрос, легко найти ответ и наоборот.


Проблема 2. Проблемы совместимости Logbook, Logstash и filebeat/elasticsearch из коробки
В той конфигурации, которая описана выше, казалось бы всё должно быть хорошо, ведь логи долетают до opensearch в удобном виде. Однако, есть нюанс. У каждого сообщения, которое логирует конкретный REST-запрос или ответ, появляются новые поля. Поля запроса и ответа отображаются в поля, имена которых состоят, в том числе из пути к этому полю в теле реального запроса/ответа.
Пример
Тело запроса
{
"field1" : "value1",
"field2": {
"field3" : "value3"
}
}
Преобразуется в сообщение
{
...
"msg.http.body.field1": "value1",
"msg.http.body.field2.field3" : "value3",
...
}
Казалось бы ничего страшного. Но это порождает следующую ситуацию. В одном запросе или ответе поле с довольно стандартным именем может быть одного типа, а в другом запросе есть поле с таким же именем, но другим типом.
Пример 1
{
"code": "SOME_TEXT"
}
Пример 2
{
"code": 1
}
Дело в том, что opensearch или elasticsearch получая новое сообщение и видя новое поле, обновляет индекс добавляя тип для этого поля в сообщении. Но если оно получает поле, тип которого отличается от уже сохранённого типа для этого поля, случается ошибка, и всё сообщение не попадает в opensearch. Таким образом мы теряем часть сообщений. То есть часть инцидентов расследовать не сможем, или это будет затруднено. Тут можно сказать, что надо внимательнее относится к типам полей, выработать какие-то правила. Но на формат API внешних для нас систем мы влиять не сможем, поэтому полностью искоренить проблему таким решением не удастся.
Решение
После некоторого исследования было принято решение изменить конфигурацию так, чтобы поле было единым msg.http.body
и было текстовым. Но, чтобы сохранить удобство, если тело запроса или ответа это JSON, мы будем туда писать JSON, но отформатированный. Для этого мы изменили объявление бина Logbook
:
@Configuration
public class CommonStarterAutoConfiguration {
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public Logbook logbook() {
HttpLogFormatter formatter = new DccJsonHttpLogFormatter();
LogstashLogbackSink logstashsink = new LogstashLogbackSink(formatter);
return Logbook.builder()
.bodyFilter(new PrettyPrintingJsonBodyFilter())
.sink(logstashsink)
.build();
}
}
Вместо JsonHttpLogFormatter
мы написали свой форматтер:
public class DccJsonHttpLogFormatter implements StructuredHttpLogFormatter {
private final ObjectMapper mapper;
public DccJsonHttpLogFormatter() {
this(new ObjectMapper());
}
public DccJsonHttpLogFormatter(final ObjectMapper mapper) {
this.mapper = mapper;
}
@Override
public Optional<Object> prepareBody(final HttpMessage message) throws IOException {
String body = message.getBodyAsString();
if (body.isEmpty()) {
return Optional.empty();
}
return Optional.of(body);
}
@Override
public String format(final Map<String, Object> content) throws IOException {
return mapper.writeValueAsString(content);
}
}
Теперь у нас в логе сообщения не теряются, тело пишется в одно поле msg.http.body
, и оно всегда отформатировано. Разбирать инциденты стало сильно проще.

Проблема 3. Сложность поиска связи одних сообщений лога с другими
Результат одного действия является причиной другого. Однако при одновременной работе нескольких процессов не так просто понять, где в логах сообщения одного процесса, а где — другого. При должном понимании процесса это возможно, однако отнимает время и силы. А для человека, который знает процесс не так глубоко, это может стать невозможным.
Решение
Здесь напрашивается слово сквозной идентификатор. Надо в каждое сообщение лога добавить идентификатор, например заявки, и фильтровать по нему.
Полностью готового решения я не нашёл, поэтому начал его собирать.
В одной из библиотек для логирования, которые мы используем, SLF4J, есть класс org.slf4j.MDC
. Он содержит несколько статических методов, которые позволяют добавлять поля в сообщение лога.
Первый такой метод это public static void put(String key, String val)
. Он как раз и добавляет поле во все сообщения, которые будут после вызова этого метода.
Второй метод public static void remove(String key)
удаляет значение из контекста, т.е. после вызова этого метода сообщения не будут содержать добавленное до этого поле.
Возникает вопрос, где эти методы запускать. Для начала было решено сделать фильтр, реализацию интерфейса jakarta.servlet.Filter
. Внутри этого фильтра перед запуском следующего фильтра будет добавляться поле в контекст логирования, а после — удаляться.
Следующий вопрос, как эти идентификаторы прокидывать дальше. Сначала было решение, что идентификатор надо читать из запроса. Но есть две проблемы. Это не универсально. То есть нужно отдельно писать код для выковыривания каждого типа идентификатора. Вторая проблема — это запросы, которые не содержат этот самый идентификатор. Адаптеры, как правило, вообще не знают, о какой заявке идёт речь. Они этот идентификатор не принимают. Принимают данные, которые нужны системе, которую они вызывают.
После некоторого количества итераций возникло следующее решение.
Создаём CrossIdHolder
который содержит контекст — все сквозные идентификаторы текущего потока. Идентификатор может быть не один. И со временем их можно туда добавлять. Чтобы следующий в цепочке вызовов сервис получил все идентификаторы, они ему передаются через заголовки http-запроса. Для этого напишем реализацию feign.RequestInterceptor
. Чтобы этот следующий сервис увидел заголовки и положил соответствующие идентификаторы в контекст, напишем реализацию jakarta.servlet.Filter
.
Получились примерно вот такие классы:
@Component
public class CrossIdHolder {
public static final IdName CLAIM_ID_NAME =
new IdName(
"claimId",
"claimId",
List.of("x-dcc-claim-id")
);
public static final List<IdName> ID_NAMES = List.of(
CLAIM_ID_NAME
);
private final ThreadLocal<List<Id>> crossIds = new ThreadLocal<>();
public List<Id> get() {
if (crossIds.get() == null) {
crossIds.set(new ArrayList<>());
}
return crossIds.get();
}
public void add(List<Id> context) {
for (Id id : context) {
add(id.name, id.value);
}
}
public void add(IdName name, String value) {
get().add(new Id(value, name));
MDC.put(name.logField, value);
}
public void clear() {
for (IdName idName : ID_NAMES) {
MDC.remove(idName.logField);
}
crossIds.remove();
}
@Value
public static class Id {
String value;
IdName name;
}
@Value
public static class IdName {
String key;
String logField;
List<String> httpHeaders;
}
}
@RequiredArgsConstructor
@Component
public class CrossIdFeignRequestInterceptor implements RequestInterceptor {
private final CrossIdHolder crossIdHolder;
@Override
public void apply(RequestTemplate template) {
for (Id id : crossIdHolder.get()) {
for (String headerName : id.getName().getHttpHeaders()) {
template.header(headerName, id.getValue());
}
}
}
}
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
@Component
@WebFilter(filterName = "crossIdLoggingFilter", urlPatterns = "/*")
@RequiredArgsConstructor
public class CrossIdLoggingFilter extends OncePerRequestFilter {
private final CrossIdHolder crossIdHolder;
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
addCrossIds(request);
try {
filterChain.doFilter(request, response);
} finally {
crossIdHolder.clear();
}
}
private void addCrossIds(HttpServletRequest request) {
for (IdName idName : CrossIdHolder.ID_NAMES) {
extractFromHeader(idName, request);
}
}
private void extractFromHeader(IdName idName, HttpServletRequest request) {
for (String headerName : idName.getHttpHeaders()) {
String value = request.getHeader(headerName);
if (value != null) {
crossIdHolder.add(idName, value);
}
}
}
}
У нас значение основного идентификатора claimId
, идентификатора заявки, генерируется внутри конвейера. При создании заявки. Поэтому в том месте, где мы впервые получили его, сразу кладём его в контекст CrossIdHolder
методом public void add(IdName name, String value)
. Этот вызов заодно положит идентификатор и в контекст MDC
. И после этого значение полетело дальше, и все потоки обработки каждого запроса в каждом сервисе помечены этим идентификатором. Все сообщения лога тоже помечены. Можно отфильтровать, и красота.

Проблема 4. Хотим добавить идентификатор внешней системы
Заявка начинается с запроса из канала — системы, которая запрашивает заявку на открытие карты. У запроса тоже есть идентификатор. Мы хотим также в логах фильтровать не только по claimId
, но и по этому идентификатору requestId
, который существует до создания самой заявки. К тому же может быть негативный сценарий, когда запрос был, а заявка по некоторым причинам не была создана.
Решение
Нам нужен единый механизм, который позволит использовать его для разных каналов, с разными форматами запросов, порождающих заявку.
Сделаем интерфейс, реализация которого будет извлекать нужный идентификатор из входящего запроса.
public interface CustomCrossIdExtractor {
void extract(HttpServletRequest request);
}
И внедрим использование этого интерфейса в наш фильтр
public class CrossIdLoggingFilter extends OncePerRequestFilter {
/* Свойства, объявленные ранее */
private final List<CustomCrossIdExtractor> crossIdExtractors;
/* Методы, объявленные ранее */
private void addCrossIds(HttpServletRequest request) {
for (IdName idName : CrossIdHolder.ID_NAMES) {
extractFromHeader(idName, request);
}
for (CustomCrossIdExtractor extractor : crossIdExtractors) {
extractor.extract(request);
}
}
}
Также добавим знание об идентификаторе в сам контекст
public class CrossIdHolder {
/* Объявленное ранее */
public static final IdName REQUEST_ID_NAME =
new IdName(
"requestId",
"requestId",
List.of("x-dcc-request-id")
);
public static final List<IdName> ID_NAMES = List.of(
CLAIM_ID_NAME, REQUEST_ID_NAME
);
}
Таким образом в нужном сервисе мы объявляем бин или бины, которые реализуют извлечение нужного идентификатора, и это автоматически встраивается в наш механизм.
Проблема 5. В асинхронных эндпоинтах идентификатор теряется
Асинхронные эндпоинты, те, которые вызывающему сервису сразу говорят 200 OK
, а потом запускают поток, в котором собственно делают бизнес-логику, теряют контекст. Логика запускается в отдельном потоке, а данные в CrossIdHolder
сохраняются в привязке к потоку. Аналогично и с MDC
.
Решение
Нужно передать эти данные в контекст нового потока.
Для передачи контекста в новый поток создаём TaskExecutor
, настроенный так, что при запуске выполняемой в нём задачи, контекст получит нужные данные, а после выполнения — очистится. Сделаем для этого конфигурационный бин и специальный TaskDecorator
.
@Configuration
@RequiredArgsConstructor
@ConditionalOnBean(value = AsyncAnnotationBeanPostProcessor.class)
public class AsyncCommonStarterExecutorConfig {
private final CrossIdHolder crossIdHolder;
@Bean(name = "asyncTaskExecutor")
public TaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Async-");
executor.setTaskDecorator(new MdcTaskDecorator(crossIdHolder));
executor.initialize();
return executor;
}
public static class MdcTaskDecorator implements TaskDecorator {
private final CrossIdHolder crossIdHolder;
public MdcTaskDecorator(CrossIdHolder crossIdHolder) {
this.crossIdHolder = crossIdHolder;
}
@NotNull
@Override
public Runnable decorate(@NotNull Runnable runnable) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
final List<Id> crossIdContext = crossIdHolder.get();
return () -> {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
crossIdHolder.add(crossIdContext);
runnable.run();
} finally {
MDC.clear();
RequestContextHolder.resetRequestAttributes();
crossIdHolder.clear();
}
};
}
}
}
Ещё немного конфигов
@Configuration
@Slf4j
@RequiredArgsConstructor
@ConditionalOnBean(value = AsyncCommonStarterExecutorConfig.class)
public class AsyncConfig implements AsyncConfigurer {
private final TaskExecutor asyncTaskExecutor;
@Override
public Executor getAsyncExecutor() {
return asyncTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (e, method, params) -> {
log.error("Исключение в асинхронном методе {}", method.getName(), e);
};
}
}
Конфигурационный бин AsyncCommonStarterExecutorConfig
помечен аннотацией @ConditionalOnBean(value = AsyncAnnotationBeanPostProcessor.class)
. Т.е. он будет подключаться только когда у приложения будет подключена асинхронность с помощью @EnableAsync
. Бин AsyncConfig
будет запущен только, если предыдущий будет запущен. Для применения наверняка этого TaskExecutor
для асинхронных методов в аннотацию @Async
надо будет прокинуть его имя: @Async("asyncTaskExecutor")
. Но благодаря AsyncConfig
работать будет и без этого, пока в контексте нет другого TaskExecutor
.
Проблема 6. Неплохо в логах бы различать, что было выполнено синхронно, а что асинхронно
Решение
Добавим параметр initiator
в сообщение лога. В этот параметр мы будем писать инициатора логики в рамках конкретного сервиса.
public class CrossIdHolder {
private static final String INITIATOR_FIELD_NAME = "initiator";
/* Добавленный ранее код */
public void addInitiator(String initiator) {
MDC.put(INITIATOR_FIELD_NAME, initiator);
}
public void clear() {
for (IdName idName : ID_NAMES) {
MDC.remove(idName.logField);
}
MDC.remove(INITIATOR_FIELD_NAME);
crossIds.remove();
}
}
Этот параметр мы не привязываем к потоку в рамках состояния CrossIdHolder
, а только к текущему контексту MDC
. Т.е. этот параметр не передастся к асинхронной задаче и не будет передан в исходящий запрос. Этого нам и не надо.
Добавим его создание в обработку запроса.
public class CrossIdLoggingFilter extends OncePerRequestFilter {
/* Написанная ранее логика */
private void addCrossIds(HttpServletRequest request) {
crossIdHolder.addInitiator("rest");
for (IdName idName : CrossIdHolder.ID_NAMES) {
extractFromHeader(idName, request);
}
for (CustomCrossIdExtractor extractor : crossIdExtractors) {
extractor.extract(request);
}
}
}
Здесь мы помечаем, что происходящее запущено синхронным запросом.
Далее добавим некоторой магии АОП для пометки асинхронной бизнес-логики.
Создадим аннотацию
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AddCrossId {
String logFieldNamePrefix();
String initiator();
}
Ей мы будем помечать асинхронные методы в коде. Т.е. добавим её к методу с аннотацией @Async
.
Напишем аспект
@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class AddCrossIdAspect {
private final ObjectMapper objectMapper;
private final CrossIdHolder crossIdHolder;
@Around("@annotation(AddCrossId)")
public Object addCrossId(ProceedingJoinPoint joinPoint) throws Throwable {
try {
if (joinPoint.getArgs().length > 0) {
Object event = joinPoint.getArgs()[0];
AddCrossId annotation = ((MethodSignature) joinPoint.getSignature()).getMethod().getAnnotation(AddCrossId.class);
crossIdHolder.addInitiator(annotation.initiator());
log(event, annotation.logFieldNamePrefix());
}
return joinPoint.proceed();
} finally {
crossIdHolder.clear();
}
}
private void log(Object event, String logFieldName) {
String prettyEvent;
try {
prettyEvent = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(event);
} catch (JsonProcessingException e) {
prettyEvent = event != null ? event.toString() : null;
}
MDC.put(logFieldName, prettyEvent);
log.trace("Получено {}", event != null ? event.getClass().getSimpleName() : null);
MDC.remove(logFieldName);
}
}
В нём устанавливаем инициатора и логируем, что стартует асинхронный метод с таким-то параметром.


Проблема 7. Хотим в логах видеть сквозной идентификатор в логике, запущенной при чтении кафки
Если пришёл rest-запрос по заявке, мы видим логи, если фильтруем по идентификатору. Если запускается какая-то асинхронная логика, стартуемая при обработке rest-запроса, мы тоже их найдём по сквозному идентификатору. Но если по событию в кафке запускается логика, также связанная с нашей сущностью, увидеть её в логе, фильтруя по нашему идентификатору, мы не сможем.
Решение
Чтобы не зависеть от формата события, надо как с REST, кидать идентификаторы в заголовки события.
Для этого в класс CrossIdHolder
добавим метод
public class CrossIdHolder {
/* Написанный ранее код */
public void addHeaders(ProducerRecord<String, ?> record) {
for (Id id : get()) {
record.headers().add(id.getName().getKey(), id.getValue().getBytes(StandardCharsets.UTF_8));
}
}
}
Этот метод берёт из контекста текущие идентификаторы и всех их прокидывает в заголовки в сообщение кафки. Его надо вызвать перед отправкой сообщения через restTemplate.
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageProducer {
@Value("${spring.kafka.topic}")
private final String kafkaTopic;
private final KafkaTemplate<String, ClaimEvent> kafkaTemplate;
private final CrossIdHolder crossIdHolder;
public void sendMessage(ClaimEvent claimEvent) {
String claimId = claimEvent.getClaimId().toString();
ProducerRecord<String, ClaimEvent> record = new ProducerRecord<>(kafkaTopic, claimId,
claimEvent);
crossIdHolder.addHeaders(record); //Вот этот вызов
kafkaTemplate.send(record);
}
}
Небольшое уточнение. Для того чтобы заполнять заголовки нужными нам данными, нам нужно создавать объект класса ProducerRecord
и использовать в KafkaTemplate
метод, который принимает объект этого класса. Иначе заголовки в событие мы не добавим.
Половина дела сделана. Теперь нужно обработать заголовки при чтении сообщения. Возвращаемся к нашим аспектам и добавим новый.
public class AddCrossIdAspect {
/* Написанный ранее код */
@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
public Object addCrossIdFromKafkaEvent(ProceedingJoinPoint joinPoint) throws Throwable {
try {
if (joinPoint.getArgs().length > 0) {
crossIdHolder.addInitiator("kafka");
Object event = joinPoint.getArgs()[0];
if (event instanceof ConsumerRecord<?,?> consumerRecord) {
addCrossId(consumerRecord);
log(
consumerRecord.value(),
String.format(BODY_FIELD_NAME_TEMPLATE, "kafka.event")
);
}
}
return joinPoint.proceed();
} finally {
crossIdHolder.clear();
}
}
protected void addCrossId(ConsumerRecord<?,?> consumerRecord) {
for (IdName field : CrossIdHolder.ID_NAMES) {
Header header = consumerRecord.headers().lastHeader(field.getKey());
if (header == null) {
continue;
}
crossIdHolder.add(field, new String(header.value()));
}
}
}
Теперь при чтении события кафки, мы аспектом прочтём все прокинутые сквозные параметры и добавим их в нужные контексты. При этом сам код слушателя никак модифицировать не надо кроме того, что он должен принимать объект класса ConsumerRecord
, если раньше делал по другому.

Итог
Теперь для того, чтобы увидеть все логи по конкретной сущности: запущенной заявке, запросе вызывающей системы, договору, идентификатору клиента или по любому другому нам достаточно в opensearch добавить фильтр по идентификатору сущности. Это позволит нам увидеть полную историю заявки: все rest-запросы и ответы, все сообщения кафки, а также выводы в лог, генерируемые в нашем коде. Имея эту историю, мы уже сможем добавлять другие фильтры и смотреть логи этой заявки, фильтруя по конкретному сервису и/или по тексту, который мы ввели в строке поиска. Для этого нам нужно в некоторых случаях просто подключить, созданный нами стартер. А в других — подключить стартер, а также добавить в сервис немного уточняющего кода: аннотации, вызов некоторых методов или имплементацию некоторых интерфейсов.