В Scalac мы ежедневно разрабатываем и внедряем распределенные приложения с высокой степенью параллелизма. Распределенные системы в настоящее время активно развиваются и не собираются в этом останавливаться. В архитектуре подобных систем, помимо Kubernetes, важное место занимает Apache Kafka.
Мы используем Apache Kafka как основу для асинхронного взаимодействия микросервисов. Простота масштабирования, устойчивость к потере и повреждению данных, репликация и легко достижимый параллелизм через консьюмер-группы (consumer groups) — вот только некоторые из причин, почему Kafka является одним из основных инструментов построения распределенных систем.
Scala, ZIO и, наконец, Apache Kafka
Не секрет, что мы любим Scala и функциональное программирование. Данная статья предполагает, что у вас есть некоторое знакомство с библиотеками функциональных эффектов, такими как ZIO, и базовое понимание Apache Kafka. Тем не менее, хотя эта статья ориентирована на разработчиков уже знакомых с ZIO, которые хотят разобраться с тем, как интегрировать Kafka в ZIO-проекты, я все-таки расскажу об основах ZIO, ZIO Streams, а также о реализации сервисов потоковой обработки данных с использованием принципов функционального программирования.
Давайте начнем.
Основы ZIO
ZIO — это самостоятельная библиотека без дополнительных зависимостей для асинхронного и параллельного программирования. Благодаря неблокирующим файберам, не расходующим ресурсы впустую, ZIO позволяет создавать масштабируемые, устойчивые и реактивные приложения, отвечающие потребностям вашего бизнеса. Узнать больше о ZIO и его преимуществах можно в официальной документации или в нашей недавней статье.
ZIO Streams
Функциональные потоки как абстракция, часто используются для элегантного решения проблем параллелизма и обработки неограниченного потока данных, ставя в приоритет безопасность ресурсов и использование процессора.
ZIO Streams очень похож на тип ZIO, о котором мы говорили в предыдущей статье. Он выглядит следующим образом:
ZStream[-R, +E, +O]
ZStream
принимает окружение типа R, тип ошибки E, с которой может упасть, и тип O, возвращаемый в случае успешного выполнения. Вы также можете думать о ZStream как о значении ZIO, которое может выдавать несколько значений вместо одного.
ZStream — это чисто функциональный pull-поток, то есть ленивый (lazy) и с обратным давлением (backpressure), что освобождает разработчика от необходимости вручную управлять и писать код обработки обратного давления.
Неудивительно, что ZIO Kafka основана на ZIO и ZIO Streams, обеспечивая удобный интерфейс для подписки, процессинга и коммита записей Kafka наряду с уже существующими операторами из ZIO Streams. Таким образом, после настройки входящего ZStream нет никакой разницы между тем, откуда поступают данные: из Kafka, файловой системы или любого другого источника. Операторы обработки будут полностью идентичны. Хотя при обработке записей из Kafka есть и некоторые специфические операции, такие как ручной коммит смещения (offset commit).
ZIO Kafka
Apache Kafka — это распространенное решение для хранения событий в событийно-ориентированных приложениях. Поскольку топики (topic) Kafka постоянно наполняются новыми событиями, то событийно-ориентированные приложения выглядят как бесконечные циклы, в которых принимаются события и каким-то образом обрабатываются. Это может быть преобразование и повторная отправка в Kafka или запись в какое-то хранилище данных для построения материализованного представления или какого-то агрегата.
Конечно, вполне допустимо использовать обычного клиента Kafka, но крайне важно, реализовать следующие возможности: буферизация, агрегирование пакетов записей до заданного тайм-аута и управление количеством отправляемых сообщений в единицу времени. Это довольно нетривиальные задачи, которые будут отвлекать разработчика и задерживать реализацию бизнес-требований. А теперь представьте, что вам нужна асинхронность и неблокирующая синхронизация. Звучит чрезвычайно сложно. Потому что так оно и есть. К счастью, вся эта функциональность доступна в ZIO Streams. И это, вероятно, основная причина, по которой вам следует отказаться от обычного клиента Kafka.
Итак, если вы хотите сосредоточиться на бизнес-логике и возложить на библиотеку все сложности, такие как безопасность ресурсов и уровень параллелизма вашего пайплайна обработки событий, то вам определенно стоит обратить внимание на ZIO Streams. А если ваша система построена на основе Apache Kafka, то вам, несомненно, понравится ZIO Kafka.
Микросервисы с ZIO и Kafka
Теперь, после того как мы разобрались с основами ZIO, ZIO Streams и ZIO Kafka, пришло время рассмотреть реализацию системы, использующей все эти технологии.
Наша система состоит из двух сервисов. Первый — это сервис producer, который генерирует события и отправляет их в Kafka, второй — processor, который получает события, отправленные producer, потом обращается к некоторому внешнему API за дополнительной информацией, вносит эту информацию в исходное событие и отправляет это дополненное сообщение обратно в Kafka.
Полная реализация доступна в репозитории, а здесь мы продолжим разбирать ее по частям.
Проект реализован как многомодульный sbt-проект. Модуль kafka запускает встроенную Kafka, используемую для разработки. В модуле protocol описаны события в виде кейс-классов. Для сообщений Kafka будем использовать JSON, поэтому в protocol также реализован codec для JSON.
Первый кейс-класс из модуля protocol — это TransactionRaw
.
TransactionRaw
final case class TransactionRaw(userId: Long, country: String, amount: BigDecimal)
object TransactionRaw {
implicit val codec: Codec[TransactionRaw] = deriveCodec[TransactionRaw]
}
TransactionRaw
моделирует события, которые публикуются сервисом producer.
Следующим событием в нашей системе будет TransactionEnriched
.
TransactionEnriched
final case class TransactionEnriched(userId: Long, country: Country, amount: BigDecimal)
object TransactionEnriched {
implicit val codec: Codec[TransactionEnriched] = deriveCodec[TransactionEnriched]
}
final case class Country(
name: String,
capital: String,
region: String,
subregion: String,
population: Long
)
object Country {
implicit val codec: Codec[Country] = deriveCodec[Country]
}
TransactionEnriched
содержит те же поля, что и TransactionRaw
, но, как вы уже догадались, дополнен информацией о стране (Country
). За получение сведений о стране из внешнего API и создание TransactionEnriched
отвечает сервис processor.
Давайте рассмотрим код producer.
object ProducerApp extends App {
override def run(args: List[String]) =
program.provideSomeLayer[Any with Blocking](appLayer).exitCode
private lazy val program =
ZStream
.fromIterable(EventGenerator.transactions)
.map(toProducerRecord)
.mapM { producerRecord =>
log.info(s"Producing $producerRecord to Kafka...") *>
Producer.produce[Any, Long, String](producerRecord)
}
.runDrain
private lazy val appLayer = {
val producerSettings = ProducerSettings(List("localhost:9092"))
val producerLayer = Producer.make[Any, Long, String](
producerSettings, Serde.long, Serde.string
).toLayer
val loggingLayer = Slf4jLogger.make { (context, message) =>
val correlationId = LogAnnotation.CorrelationId.render(
context.get(LogAnnotation.CorrelationId))
"[correlation-id = %s] %s".format(correlationId, message)
}
loggingLayer ++ producerLayer
}
private def toProducerRecord(transaction: TransactionRaw): ProducerRecord[Long, String] =
new ProducerRecord("transactions.raw", transactionRaw.userId, transactionRaw.asJson.toString)
}
Основная логика представлена в program, где мы сначала создаем ZStream из некоторого жестко заданного списка транзакций TransactionRaw
, а затем преобразуем элементы потока в ProducerRecord
, чтобы их можно было отправить с помощью Kafka Producer. После создадим ZIO Layer для Kafka и логера. Если вы уже сталкивались с каким-либо потокоориентированным кодом, то здесь вам все должно быть знакомо. Единственное, могут вызвать недоумение такие операторы как mapM
и runDrain
, поэтому я их поясню.
Как и стандартный оператор map
, преобразующий элементы потока с помощью предоставленной функции, mapM
делает то же самое, но в него передается эффект. Например, если map требует функцию f: A => B
, то mapM
— функцию f: A => F[B]
, где F
может быть IO
, Task
или каким-либо другим эффектом.
Поскольку наш основной метод в ZIO-приложении должен возвращать тип ZIO, нам необходимо преобразовать ZStream
в ZIO. Именно для этого преобразования используется оператор runDrain
, который запускает поток для выполнения только указанных эффектов. Смотрите исходный код.
Мы постепенно подошли к основному сервису — processor. Так как processor довольно большой, не будем рассматривать каждую строчку кода, а обратим внимание на наиболее важные фрагменты.
Как мы уже говорили, событийно-управляемые микросервисы проектируются как бесконечные циклы, которые постоянно опрашивают сообщения в Kafka и реагируют на них тем или иным образом. Главным компонентом нашего сервиса является Pipeline, который представляет собой ZLayer
, содержащий метод run
, выполняющийся бесконечно.
lazy val live: ZLayer[PipelineEnvironment, Nothing, Pipeline] =
ZLayer.fromFunction { env =>
new Service {
override def run(): IO[Throwable, Unit] =
(log.info("Starting processing pipeline") *>
Consumer
.subscribeAnd(Subscription.topics("transactions.raw"))
.plainStream(Serde.long, Serde.string)
.mapM { cr =>
val parsed = decode[TransactionRaw](cr.value)
parsed match {
case Right(transactionRaw) =>
Enrichment
.enrich(transactionRaw)
.map(toProducerRecord)
.flatMap(Producer.produce[Any, Long, String](_))
.as(cr)
case Left(error) =>
(log.info(s"Deserialization error $error")
*> ZIO.succeed(cr))
}
}
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
.runDrain)
.provide(env)
}
}
Вся логика инкапсулирована в этом сервисе. После того как Consumer подпишется на указанный топик, он вернет абстракцию на основе ZStream
над Kafka Topic, в которой обрабатываются элементы потока. Когда событие получено, оно парсится из JSON-строки в кейс-класс TransactionRaw
, после чего используется сервис Enrichment для связи с внешним API, чтобы получить сведения о стране и создать событие TransactionEnriched
. Если сообщение не может быть получено из JSON, то логируется сообщение об ошибке и передается дальше в исходном виде.
Далее рассмотрим сервис Enrichment
:
lazy val live: ZLayer[
Logging with CountryCache with SttpClient, Nothing, Enrichment
] = ZLayer.fromFunction { env =>
new Service {
override def enrich(
transactionRaw: TransactionRaw
): IO[ProcessorError, TransactionEnriched] =
(for {
_ <- log.info("Getting country details from cache for")
country <- CountryCache.get(transactionRaw.country)
result <- country.fold(
fetchAndCacheCountryDetails(transactionRaw.country)
)(ZIO.succeed(_))
} yield toTransactionEnriched(
transactionRaw, result
)).provide(env)
}
}
И несколько вспомогательных методов для этого сервиса:
private def fetchAndCacheCountryDetails(countryName: String): ZIO[Logging with CountryCache with SttpClient, ProcessorError, Country] =
for {
_ <- log.info(s"Cache miss. Fetching details from external API.")
country <- fetchCountryDetails(countryName)
_ <- CountryCache.put(country)
} yield country
private def fetchCountryDetails(
countryName: String
): ZIO[SttpClient, ProcessorError, Country] =
for {
req <- ZIO.succeed(
basicRequest.get(urlOf(countryName)).response(asJson[List[Country]])
)
res <- SttpClient.send(req).orElseFail(CountryApiUnreachable)
country <- res.body.fold(
_ => ZIO.fail(ResponseExtractionError),
res => ZIO.succeed(res.head)
)
} yield country
Возможно, вы заметили, что сервис Enrichment
использует CountryCache
, который представляет собой слой, отвечающий за кэширование HTTP-ответов от внешнего API.
Полный исходный код доступен в репозитории.
Резюме
В этом посте мы рассмотрели, что такое ZIO и как его использовать в сочетании с ZIO Streams для разработки микросервисов в функциональном стиле с событийно-ориентированной архитектурой. Я надеюсь, что вас заинтересовали описанные в статье преимущества функциональных стриминговых библиотек и вы начнете использовать их в своих проектах.
Статья переведена в преддверии старта курса "Scala-разработчик". Всех желающих приглашаем на бесплатный демоурок в рамках которого познакомимся с основными Scala коллекциями, рассмотрим что у них общего и в чем отличия. Разберем особенности.
andr1983
Это все хорошо, но непонятна целевая аудитория. Для того чтобы заинтересовать со стороны - выглядит сложно. Для тех, кто начинает разбираться по сути копипаста официальной документации.
Интересно было бы посмотреть на более продвинутые примеры. Как реализовать стейтфул процессинг, например.