В Scalac мы ежедневно разрабатываем и внедряем распределенные приложения с высокой степенью параллелизма. Распределенные системы в настоящее время активно развиваются и не собираются в этом останавливаться. В архитектуре подобных систем, помимо Kubernetes, важное место занимает Apache Kafka.

Мы используем Apache Kafka как основу для асинхронного взаимодействия микросервисов. Простота масштабирования, устойчивость к потере и повреждению данных, репликация и легко достижимый параллелизм через консьюмер-группы (consumer groups) — вот только некоторые из причин, почему Kafka является одним из основных инструментов построения распределенных систем.

Scala, ZIO и, наконец, Apache Kafka

Не секрет, что мы любим Scala и функциональное программирование. Данная статья предполагает, что у вас есть некоторое знакомство с библиотеками функциональных эффектов, такими как ZIO, и базовое понимание Apache Kafka. Тем не менее, хотя эта статья ориентирована на разработчиков уже знакомых с ZIO, которые хотят разобраться с тем, как интегрировать Kafka в ZIO-проекты, я все-таки расскажу об основах ZIO, ZIO Streams, а также о реализации сервисов потоковой обработки данных с использованием принципов функционального программирования.

Давайте начнем.

Основы ZIO

ZIO — это самостоятельная библиотека без дополнительных зависимостей для асинхронного и параллельного программирования. Благодаря неблокирующим файберам, не расходующим ресурсы впустую, ZIO позволяет создавать масштабируемые, устойчивые и реактивные приложения, отвечающие потребностям вашего бизнеса. Узнать больше о ZIO и его преимуществах можно в официальной документации или в нашей недавней статье

ZIO Streams

Функциональные потоки как абстракция, часто используются для элегантного решения проблем параллелизма и обработки неограниченного потока данных, ставя в приоритет безопасность ресурсов и использование процессора.

ZIO Streams очень похож на тип ZIO, о котором мы говорили в предыдущей статье. Он выглядит следующим образом:

ZStream[-R, +E, +O]

ZStream принимает окружение типа R, тип ошибки E, с которой может упасть, и тип O, возвращаемый в случае успешного выполнения. Вы также можете думать о ZStream как о значении ZIO, которое может выдавать несколько значений вместо одного. 

ZStream — это чисто функциональный pull-поток, то есть ленивый (lazy) и с обратным давлением (backpressure), что освобождает разработчика от необходимости вручную управлять и писать код обработки обратного давления.

Неудивительно, что ZIO Kafka основана на ZIO и ZIO Streams, обеспечивая удобный интерфейс для подписки, процессинга и коммита записей Kafka наряду с уже существующими операторами из ZIO Streams. Таким образом, после настройки входящего ZStream нет никакой разницы между тем, откуда поступают данные: из Kafka, файловой системы или любого другого источника. Операторы обработки будут полностью идентичны. Хотя при обработке записей из Kafka есть и некоторые специфические операции, такие как ручной коммит смещения (offset commit).

ZIO Kafka

Apache Kafka — это распространенное решение для хранения событий в событийно-ориентированных приложениях. Поскольку топики (topic) Kafka постоянно наполняются новыми событиями, то событийно-ориентированные приложения выглядят как бесконечные циклы, в которых принимаются события и каким-то образом обрабатываются. Это может быть преобразование и повторная отправка в Kafka или запись в какое-то хранилище данных для построения материализованного представления или какого-то агрегата.

Конечно, вполне допустимо использовать обычного клиента Kafka, но крайне важно, реализовать следующие возможности: буферизация, агрегирование пакетов записей до заданного тайм-аута и управление количеством отправляемых сообщений в единицу времени. Это довольно нетривиальные задачи, которые будут отвлекать разработчика и задерживать реализацию бизнес-требований. А теперь представьте, что вам нужна асинхронность и неблокирующая синхронизация. Звучит чрезвычайно сложно. Потому что так оно и есть. К счастью, вся эта функциональность доступна в ZIO Streams. И это, вероятно, основная причина, по которой вам следует отказаться от обычного клиента Kafka.

Итак, если вы хотите сосредоточиться на бизнес-логике и возложить на библиотеку все сложности, такие как безопасность ресурсов и уровень параллелизма вашего пайплайна обработки событий, то вам определенно стоит обратить внимание на ZIO Streams. А если ваша система построена на основе Apache Kafka, то вам, несомненно, понравится ZIO Kafka.

Микросервисы с ZIO и Kafka

Теперь, после того как мы разобрались с основами ZIO, ZIO Streams и ZIO Kafka, пришло время рассмотреть реализацию системы, использующей все эти технологии.

Наша система состоит из двух сервисов. Первый — это сервис producer, который генерирует события и отправляет их в Kafka, второй — processor, который получает события, отправленные producer, потом обращается к некоторому внешнему API за дополнительной информацией, вносит эту информацию в исходное событие и отправляет это дополненное сообщение обратно в Kafka. 

Полная реализация доступна в репозитории, а здесь мы продолжим разбирать ее по частям.

Проект реализован как многомодульный sbt-проект. Модуль kafka запускает встроенную Kafka, используемую для разработки. В модуле protocol описаны события в виде кейс-классов. Для сообщений Kafka будем использовать JSON, поэтому в protocol также реализован codec для  JSON.

Первый кейс-класс из модуля protocol — это TransactionRaw.

TransactionRaw

final case class TransactionRaw(userId: Long, country: String, amount: BigDecimal)

object TransactionRaw {
    implicit val codec: Codec[TransactionRaw] = deriveCodec[TransactionRaw]
}

TransactionRaw моделирует события, которые публикуются сервисом producer

Следующим событием в нашей системе будет TransactionEnriched.

TransactionEnriched

final case class TransactionEnriched(userId: Long, country: Country, amount: BigDecimal)

object TransactionEnriched {
  implicit val codec: Codec[TransactionEnriched] = deriveCodec[TransactionEnriched]
}

final case class Country(
    name: String,
    capital: String,
    region: String,    
    subregion: String,  
    population: Long
)

object Country {
  implicit val codec: Codec[Country] = deriveCodec[Country]
}

TransactionEnriched содержит те же поля, что и TransactionRaw, но, как вы уже догадались, дополнен информацией о стране (Country). За получение сведений о стране из внешнего API и создание TransactionEnriched отвечает сервис processor

Давайте рассмотрим код producer.

object ProducerApp extends App {
  override def run(args: List[String]) =
    program.provideSomeLayer[Any with Blocking](appLayer).exitCode

  private lazy val program =
    ZStream
      .fromIterable(EventGenerator.transactions)
      .map(toProducerRecord)
      .mapM { producerRecord =>
        log.info(s"Producing $producerRecord to Kafka...") *>
          Producer.produce[Any, Long, String](producerRecord)
      }
      .runDrain

  private lazy val appLayer = {
    val producerSettings = ProducerSettings(List("localhost:9092"))
    val producerLayer = Producer.make[Any, Long, String](
        producerSettings, Serde.long, Serde.string
    ).toLayer

    val loggingLayer = Slf4jLogger.make { (context, message) =>
      val correlationId = LogAnnotation.CorrelationId.render(          
       context.get(LogAnnotation.CorrelationId))
      "[correlation-id = %s] %s".format(correlationId, message)
    }

    loggingLayer ++ producerLayer
  }

  private def toProducerRecord(transaction: TransactionRaw): ProducerRecord[Long, String] =
    new ProducerRecord("transactions.raw", transactionRaw.userId, transactionRaw.asJson.toString)
}

Основная логика представлена в program, где мы сначала создаем ZStream из некоторого жестко заданного списка транзакций TransactionRaw, а затем преобразуем элементы потока в ProducerRecord, чтобы их можно было отправить с помощью Kafka Producer. После создадим ZIO Layer для Kafka и логера. Если вы уже сталкивались с каким-либо потокоориентированным кодом, то здесь вам все должно быть знакомо. Единственное, могут вызвать недоумение такие операторы как mapM и runDrain, поэтому я их поясню. 

Как и стандартный оператор map, преобразующий элементы потока с помощью предоставленной функции, mapM делает то же самое, но в него передается эффект. Например, если map требует функцию f: A => B, то mapM — функцию f: A => F[B], где F может быть IO, Task или каким-либо другим эффектом. 

Поскольку наш основной метод в ZIO-приложении должен возвращать тип ZIO, нам необходимо преобразовать ZStream в ZIO. Именно для этого преобразования используется оператор runDrain, который запускает поток для выполнения только указанных эффектов. Смотрите исходный код.

Мы постепенно подошли к основному сервису — processor. Так как processor довольно большой, не будем рассматривать каждую строчку кода, а обратим внимание на наиболее важные фрагменты.

Как мы уже говорили, событийно-управляемые микросервисы проектируются как бесконечные циклы, которые постоянно опрашивают сообщения в Kafka и реагируют на них тем или иным образом. Главным компонентом нашего сервиса является Pipeline, который представляет собой ZLayer, содержащий метод run, выполняющийся бесконечно.

lazy val live: ZLayer[PipelineEnvironment, Nothing, Pipeline] =
    ZLayer.fromFunction { env =>
      new Service {
        override def run(): IO[Throwable, Unit] =
          (log.info("Starting processing pipeline") *>
            Consumer
              .subscribeAnd(Subscription.topics("transactions.raw"))
              .plainStream(Serde.long, Serde.string)
              .mapM { cr =>
                val parsed = decode[TransactionRaw](cr.value)

                parsed match {
                  case Right(transactionRaw) =>
                    Enrichment
                      .enrich(transactionRaw)
                      .map(toProducerRecord)
                      .flatMap(Producer.produce[Any, Long, String](_))
                      .as(cr)
                  case Left(error) =>
                    (log.info(s"Deserialization error $error")
                     *> ZIO.succeed(cr))
                }
              }
              .map(_.offset)
              .aggregateAsync(Consumer.offsetBatches)
              .mapM(_.commit)
              .runDrain)
            .provide(env)
      }
    }

Вся логика инкапсулирована в этом сервисе. После того как Consumer подпишется на указанный топик, он вернет абстракцию на основе ZStream над Kafka Topic, в которой обрабатываются элементы потока. Когда событие получено, оно парсится из JSON-строки в кейс-класс TransactionRaw, после чего используется сервис Enrichment для связи с внешним API, чтобы получить сведения о стране и создать событие TransactionEnriched. Если сообщение не может быть получено из JSON, то логируется сообщение об ошибке и передается дальше в исходном виде.

Далее рассмотрим сервис Enrichment:

lazy val live: ZLayer[
  Logging with CountryCache with SttpClient, Nothing, Enrichment
] = ZLayer.fromFunction { env =>
    new Service {
      override def enrich(
        transactionRaw: TransactionRaw
      ): IO[ProcessorError, TransactionEnriched] =
        (for {
          _       <- log.info("Getting country details from cache for")
          country <- CountryCache.get(transactionRaw.country)
          result  <- country.fold(
            fetchAndCacheCountryDetails(transactionRaw.country)
          )(ZIO.succeed(_))
        } yield toTransactionEnriched(
          transactionRaw, result
        )).provide(env)
      }
  }

И несколько вспомогательных методов для этого сервиса:

private def fetchAndCacheCountryDetails(countryName: String): ZIO[Logging with CountryCache with SttpClient, ProcessorError, Country]  =
 for {
      _ <- log.info(s"Cache miss. Fetching details from external API.")
      country <- fetchCountryDetails(countryName)
      _ <- CountryCache.put(country)
  } yield country


private def fetchCountryDetails(
  countryName: String
): ZIO[SttpClient, ProcessorError, Country] =
 for { 
 req <- ZIO.succeed(
      basicRequest.get(urlOf(countryName)).response(asJson[List[Country]])
  )
  res <- SttpClient.send(req).orElseFail(CountryApiUnreachable)
  country <- res.body.fold(
        _ => ZIO.fail(ResponseExtractionError),
        res => ZIO.succeed(res.head)
  )
} yield country

Возможно, вы заметили, что сервис Enrichment использует CountryCache, который представляет собой слой, отвечающий за кэширование HTTP-ответов от внешнего API.

Полный исходный код доступен в репозитории.

Резюме

В этом посте мы рассмотрели, что такое ZIO и как его использовать в сочетании с ZIO Streams для разработки микросервисов в функциональном стиле с событийно-ориентированной архитектурой. Я надеюсь, что вас заинтересовали  описанные в статье преимущества функциональных стриминговых библиотек и вы начнете использовать их в своих проектах.


Статья переведена в преддверии старта курса "Scala-разработчик". Всех желающих приглашаем на бесплатный демоурок в рамках которого познакомимся с основными Scala коллекциями, рассмотрим что у них общего и в чем отличия. Разберем особенности.

Комментарии (1)


  1. andr1983
    03.11.2021 23:14

    Это все хорошо, но непонятна целевая аудитория. Для того чтобы заинтересовать со стороны - выглядит сложно. Для тех, кто начинает разбираться по сути копипаста официальной документации.

    Интересно было бы посмотреть на более продвинутые примеры. Как реализовать стейтфул процессинг, например.