О чём речь

В FunBox мы делаем продукты для мобильных операторов: различные порталы, геосервисы, платежи, мобильную рекламу и многое другое. Один из наших проектов построен на микросервисной архитектуре. Основная функциональность связана с обработкой потоков событий, поэтому мы выбрали событийно-ориентированную архитектуру. Для организации централизованного, масштабируемого и быстрого механизма обмена сообщениями используем Apache Kafka. Это популярная платформа для решения подобных задач, плюс у неё есть интерфейс для организации потоковой обработки событий.

В процессе реализации сервиса, занимающегося аналитикой звонков, у нас возникла задача агрегировать несколько потоков данных во времени. Когда User 1 звонит User 2, два внешних сервиса генерируют JSON-события:

  • {"user1": id1, "user2": id2, "timestamp": ts1, "analytics": data1}

  • {"user1": id1, "user2": id2, "timestamp": ts2, "metrics": data2}

Наше приложение получает их в соответствующих топиках Kafka: analyticsTopic и metricsTopic. Отметим, что время событий ts1 и ts2 может не совпадать, допускается их различие не более чем на T секунд, где T — заданная константа.

Из статистики звонков известно, что в большинстве случаев время между двумя последовательными звонками для одной пары пользователей составляет не менее 2*Т. Но иногда какое-то событие может не попасть в Kafka по независящим от нас причинам. Разрабатываемый сервис потоковой обработки данных должен находить пары событий, соответствующие одному звонку, и выдавать в топик outputTopic агрегированные JSON-события вида {"user1": id1, "user2": id2, "ts": ts, "analytics": data1, "metrics": data2}, при этом время ts = min(ts1, ts2).

Основные понятия Kafka Streams: потоки и процессоры

Для решения таких задач мы используем Kafka Streams. Для начала рассмотрим основные понятия.

Kafka Streams — это клиентская библиотека для потоковой работы с данными в Kafka. Она предоставляет высокоуровневый Streams DSL и низкоуровневый Processor API. Центральное понятие библиотеки Kafka Streams — топология сети обработчиков. Это граф, состоящий из потоковых процессоров (узлы) и потоков (рёбра). Приложение использует топологии, чтобы преобразовывать данные и вычисления.

https://kafka.apache.org/30/images/streams-architecture-topology.jpg
https://kafka.apache.org/30/images/streams-architecture-topology.jpg

Поток — бесконечный, постоянно обновляемый набор данных. Это упорядоченная, отказоустойчивая последовательность неизменяемых записей с возможностью повторного чтения. Каждая запись определяется парой ключ–значение.

Потоковый процессор — это один шаг обработки данных, т. е. получение записи от предшествующего процессора, выполнение некоторой операции и, возможно, выдача одной или нескольких записей в последующие процессоры. Существует два особых процессора: источник и поглотитель. Источник не имеет предшествующих процессоров, он получает записи из топиков Kafka и передаёт их в последующие процессоры. Поглотитель не имеет последующих процессоров, он только передаёт в топик Kafka записи, принимаемые от предыдущих процессоров. Детальное описание можно посмотреть на сайте проекта.

Построение топологии

Наше решение охватывает несколько этапов:

  • получение и группировка входящих событий по паре абонентов;

  • объединение событий, соответствующих одному звонку;

  • отсеивание промежуточных событий и получение результата.

Рассмотрим каждый подробнее.

Получение входных данных

Kafka Streams требует указания специального класса для сериализации и десериализации данных, с которыми идёт работа. В нашем случае — для преобразования JSON в Java-объекты и обратно. Библиотека предоставляет такие классы для базовых типов, а для обработки JSON-данных нам потребуется добавить свой, работа которого основана на ObjectMapper, — JSONSerde<T>. Подробности в документации.

Входящие события будут преобразовываться из JSON в Java-объекты класса AnalyticsEvent и MetricsEvent.

public class AnalyticsEvent implements JSONSerdeCompatible {
  public String user1;
  public String user2;
  public Long timestamp;
  public String analytics;
  ...
}

public class MetricsEvent implements JSONSerdeCompatible {
  public String user1;
  public String user2;
  public Long timestamp;
  public String metrics;
  ...
}

Следует учитывать, что порядок появления входящих событий в топике Kafka может отличаться от порядка, в котором они генерируются, поэтому при обработке нужно использовать время из тела события (timestamp). Чтобы это получилось, при создании источника (source) передаётся класс, реализующий интерфейс для извлечения времени из тела события.

public class AnalyticsEventTimestampExtractor implements TimestampExtractor {
  @Override
  public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
    long ts;
    Long event_ts = ((AnalyticsEvent)record.value()).timestamp;
    if (event_ts == null) {
      ts = record.timestamp();
    } else {
      ts = event_ts;
    }
    return ts;
  }
}

При создании потоков входных данных события группируются по новому ключу, содержащему пару абонентов User 1 и User 2. Благодаря этому все дальнейшие преобразования будут независимо производиться для событий, относящихся к разным парам абонентов.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, AnalyticsEvent> analyticsStream = builder
  .stream(
    analyticsTopicName,
    Consumed.with(Serdes.String(), new JSONSerde<AnalyticsEvent>(), new AnalyticsEventTimestampExtractor(), Topology.AutoOffsetReset.LATEST)
  )
  .selectKey((k, v) -> String.format("%s-%s", v.user1, v.user2));
KStream<String, MetricsEvent> metricsStream = builder
  .stream(
    metricsTopicName,
    Consumed.with(Serdes.String(), new JSONSerde<MetricsEvent>(), new MetricsEventTimestampExtractor(), Topology.AutoOffsetReset.LATEST)
  )
.selectKey((k, v) -> String.format("%s-%s", v.user1, v.user2));

Объединение событий из двух потоков

Теперь можно объединить данные из двух потоков, используя outerJoin, аналогично одноимённой операции в реляционных БД. Для временной группировки множества событий используются скользящие окна. Мы задаём ширину окна равной Т (1 минута). Также следует указать grace-интервал (3 минуты), который определяет задержку реального времени закрытия окна, чтобы дождаться запаздывающих событий. Например, событие имеет таймстамп, попадающий в данное окно, но оно записано в топик с некоторой задержкой. Если такое событие уложится в grace-интервал, то будет учтено в рассматриваемом окне.
Для каждой пары событий, попавшей в окно при выполнении outerJoin, вызывается функция объединения, задаваемая разработчиком. В нашем случае она возвращает объединённое событие класса CompoundEvent.

public class CompoundEvent implements JSONSerdeCompatible {
  public String user1;
  public String user2;
  public Long timestamp;
  public String analytics;
  public String metrics;

  public CompoundEvent merge(CompoundEvent other) {
    if (user1 == null && other.user1 != null) {
      user1 = other.user1;
    }
    if (user2 == null && other.user2 != null) {
      user2 = other.user2;
    }
    if (other.timestamp != null) {
      if (timestamp == null) {
        timestamp = other.timestamp;
      } else {
        timestamp = Math.min(timestamp, other.timestamp);
      }
    }
    if (analytics == null && other.analytics != null) {
      analytics = other.analytics;
    }
    if (metrics == null && other.metrics != null) {
      metrics = other.metrics;
    }
    return this;
  }

  public static CompoundEvent create(AnalyticsEvent analyticsEvent, MetricsEvent metricsEvent) {
    CompoundEvent compoundEvent = new CompoundEvent();
    if (analyticsEvent != null) {
      compoundEvent.user1 = analyticsEvent.user1;
      compoundEvent.user2 = analyticsEvent.user2;
      compoundEvent.analytics = analyticsEvent.analytics;
      compoundEvent.timestamp = analyticsEvent.timestamp;
    }
    if (metricsEvent != null) {
      compoundEvent.metrics = metricsEvent.metrics;
      if (compoundEvent.user1 == null) {
        compoundEvent.user1 = metricsEvent.user1;
      }
      if (compoundEvent.user2 == null) {
        compoundEvent.user2 = metricsEvent.user2;
      }
      if (compoundEvent.timestamp == null) {
        compoundEvent.timestamp = metricsEvent.timestamp;
      } else {
        compoundEvent.timestamp = Math.min(compoundEvent.timestamp, metricsEvent.timestamp);
      }
    }
    return compoundEvent;
  }
}

Так формируется первичный поток объединённых событий.

KStream<String, CompoundEvent> compoundRawStream = analyticsStream
  .outerJoin(
    metricsStream,
    (analytics, metrics) -> CompoundEvent.create(analytics, metrics),
    JoinWindows.of(Duration.ofMinutes(1L)).grace(Duration.ofMinutes(3L))
);

Первичный поток объединённых данных содержит два события для каждой пары исходных. При этом первое из них получается на основании одного исходного, а второе — на основании пары исходных. В первом случае функция объединения вызывается с аргументами (event1, null), во втором — (event1, event2). На следующем этапе требуется оставить только второе объединённое событие.

Но возможна ситуация, когда мы получили только одно исходное событие для звонка. Тогда при выполнении outerJoin будет получено только одно объединённое событие, основанное на одном исходном, которое нам и нужно оставить. Получается, что мы не можем всегда оставлять только объединённые события, содержащие данные пары исходных. Для решения такой задачи на следующем шаге применяется сессионное окно.

Отсеивание лишних событий при помощи сессионного окна и подавления

Название «сессионное окно» связано с понятием сессии пользователя, например, при взаимодействии с веб-сайтом. Оно создаётся для некоторого события в потоке и закрывается, когда интервал времени до прихода следующего события превышает время бездействия — по аналогии со временем бездействия пользователя. Затем открывается очередное сессионное окно.

https://kafka.apache.org/30/images/streams-session-windows-02.png
https://kafka.apache.org/30/images/streams-session-windows-02.png

Механизм действия сессионного окна позволяет сгруппировать события, полученные на предыдущем шаге, по принадлежности к исходному событию. Предполагается, что исходные события разделены во времени интервалом больше Т, поэтому время бездействия для закрытия сессионного окна выбираем равным Т. При накоплении объединённых событий в сессионном окне производится простая агрегация: в качестве актуального берётся последнее из поступивших в сессионное окно событий. Kafka Streams устроены так, что каждый вызов функции-агрегатора порождает выходное событие. Чтобы отсеять все порождённые события, кроме последнего, применяется подавление (suppression). В результирующем событии восстанавливается исходный ключ, и работа с окном на этом завершена.

KStream<String, CompoundEvent> compoundStream = compoundRawStream
  .groupByKey()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(1L)).grace(Duration.ZERO))
  .reduce(
    (aggCompound, nextCompound) -> aggCompound.merge(nextCompound),
    Materialized.with(Serdes.String(), new JSONSerde<CompoundEvent>())
  )
  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
  .toStream((k, v) -> k.key());

На выходе получаем одно объединённое событие, соответствующее исходному звонку. Оно отправляется в топик с результатами outputTopic.

Неожиданный поворот

Во время тестирования топологии обнаружилась неожиданная проблема. После отправки входящих событий в соответствующие топики на выходе ничего не было. Отладка и тестирование приложения Kafka Streams — обширный вопрос, достойный отдельного цикла статей, поэтому здесь рассмотрим результаты.

Выяснилось, что это известная особенность работы временных окон. Она заключается в том, что обработка событий в них основана не на системном времени (wall-clock time), а на времени потока событий (stream time), которое продвигается только при поступлении новых событий в конкретный топик и партицию. В нашем случае это приводит к необходимости получения нового события для соответствующей пары абонентов. Но ожидать очередного звонка между парой пользователей, чтобы получить данные о предыдущем, недопустимо. Подходящим решением стала генерация пустых служебных событий, расположенных во времени сразу за границей сессионного окна. Они не учитываются при объединении данных и используются лишь для актуализации времени в потоке событий. Таким образом, сессионное окно принудительно закрывается, и мы получаем результирующее событие на выходе.

Отправка пустых событий

Для реализации описанного механизма уже недостаточно возможностей высокоуровневого Streams DSL, поэтому потребовалось использовать низкоуровневый Processor API. Решение состоит из двух частей:

  1. При получении каждого CompoundEvent в отдельное хранилище помещается временная метка, смещённая на T+grace+1 секунд. Ключ записи совпадает с ключом полученного события.

  2. Запускается периодическое задание, выполняемое по системным часам (wall-clock). Оно извлекает из хранилища записи, время которых меньше текущего, генерирует и выдаёт в поток пустые события с соответствующей временной меткой.

public class WindowCloserTransformer implements Transformer<String, CompoundEvent, KeyValue<String, CompoundEvent>> {
  private ProcessorContext context;
  private KeyValueStore kvStore;
  private String storeName;
  private Long delayMs;

  public WindowCloserTransformer(String storeName, Long delayMs) {
    this.storeName = storeName;
    this.delayMs = delayMs;
  }
  
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.kvStore = context.getStateStore(this.storeName);
    context.schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
      try (final KeyValueIterator<String, Long> iter = kvStore.all()) {
        Set<String> toDelete = new HashSet<String>();
        while (iter.hasNext()) {
          final KeyValue<String, Long> entry = iter.next();
          if (entry.value <= timestamp) {
            context.forward(entry.key, CompoundEvent.createDummy(entry.value), To.all().withTimestamp(entry.value));
            toDelete.add(entry.key);
          }
        }
        for (String k : toDelete) {
          kvStore.delete(k);
        }
      }
    });
  }

  @Override
  public KeyValue<String, CompoundEvent> transform(String key, CompoundEvent event) {
    if (!event.dummy) {
      if (event.timestamp != null) {
        kvStore.put(key, event.timestamp + delayMs);
      }
      return KeyValue.pair(key, event);
    }
    return null;
  }

  @Override
  public void close() {
  }
}

Осталось включить реализованный процессор в нашу топологию и добавить фильтрацию пустых событий.

KStream<String, CompoundEvent> compoundRawStream = ...
KStream<String, CompoundEvent> compoundStreamWithDummy = compoundRawStream
  .transform(new TransformerSupplier() {
    public Transformer<String, CompoundEvent, KeyValue<String, CompoundEvent>> get() {
      return new WindowCloserTransformer("dummyPushStore", (1+3)*60000L);
    }
    public Set<StoreBuilder<?>> stores() {
      StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder =
        Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("dummyPushStore"), Serdes.String(), Serdes.Long());
      return Collections.singleton(keyValueStoreBuilder);
    }
  });
KStream<String, CompoundEvent> compoundStream = ...

Теперь временные окна закрываются вовремя, и мы получаем ожидаемые события.

Итоги

В итоге мы смогли создать необходимую топологию сети обработчиков, позволяющую агрегировать во времени два потока данных. Осталось только настроить и запустить Java-приложение.

Комментарии (2)


  1. zloyreznic
    01.02.2022 13:09

    Спасибо за статью
    Было бы отлично код в github чтоб можно было попробовать.


  1. kawena54
    03.02.2022 15:07

    про то что не закрываются окна ,какието пустые пакеты - это конечно сильно.

    сколько лет работают с 2,3м уровнем кафки никогда такого не встречал .