Привет, Хабр! Сегодня рассмотрим, как реализовать паттерн Composite в Kotlin с помощью sealed-классов и корутин. Если у вас есть сложная система с кучей объектов — простых и составных — и вы хотите управлять ими, не теряя асинхронности, то этот гайд для вас.
Немного теории
Представьте, у вас есть дерево объектов. Не реальное дерево, конечно, а структурное: корень, ветки, листья. Причём некоторые листья могут быть такими же деревьями. Вот в этом-то хаосе вам и нужно как-то работать — скажем, взять и применить операцию ко всем элементам, не заботясь, кто там ветка, кто лист. Composite — это как универсальный интерфейс, который позволяет обращаться с составными и одиночными объектами одинаково. Вместо тысячи if-else
можно получить довольно стройную иерархию, где всё просто: композиты содержат другие компоненты, а листья выполняют конкретную работу. Лепота!
Но на практике есть нюанс. Дерево-то может быть не просто деревом, а лесом, где операции происходят асинхронно. Пока один компонент зависает на своей долгой задаче, другой уже завершил работу. Тут Composite начинает выглядеть немного иначе: он всё ещё объединяет объекты, но теперь работает с учётом параллельных процессов. Поэтому-то мы и берём в руки корутины — чтобы каждая ветка делала своё дело, а результат собирался сам собой.
Реализуем базовый компонент
Начнём с создания базового абстрактного класса Component
, который будет sealed-классом:
sealed class Component {
abstract suspend fun execute(): Result
}
Обратите внимание на suspend
в функции execute()
— это значит, что операции могут быть асинхронными. Также возвращаем Result
, чтобы аккуратно работать с ошибками.
Создаём листья дерева
Листья — это конечные объекты, которые не содержат других компонентов. Реализуем их:
class Leaf(private val name: String) : Component() {
override suspend fun execute(): Result {
return try {
println("Лист [$name]: начало выполнения")
delay(1000) // Эмулируем долгую операцию
println("Лист [$name]: выполнение завершено")
Result.success(Unit)
} catch (e: Exception) {
println("Лист [$name]: ошибка — ${e.message}")
Result.failure(e)
}
}
}
Тут всё просто: имитируем работу с помощью delay
, обрабатываем возможные исключения и возвращаем результат.
Создаём композитные узлы
Композитные узлы могут содержать другие компоненты, будь то листья или другие композиты:
class Composite(private val name: String) : Component() {
private val children = mutableListOf()
fun add(component: Component) = children.add(component)
fun remove(component: Component) = children.remove(component)
override suspend fun execute(): Result = coroutineScope {
println("Композит [$name]: начало выполнения")
val results = children.map { child ->
async {
child.execute()
}
}.awaitAll()
val failures = results.filterIsInstance<>()
if (failures.isNotEmpty()) {
println("Композит [$name]: обнаружены ошибки")
Result.failure(Exception("Ошибки в дочерних компонентах"))
} else {
println("Композит [$name]: выполнение успешно завершено")
Result.success(Unit)
}
}
}
Здесь используем coroutineScope
и async
для параллельного выполнения операций в дочерних компонентах. Это значит, что все компоненты будут выполняться одновременно, а мы дождёмся их завершения с помощью awaitAll()
.
Собираем всё вместе
Теперь создадим дерево компонентов и запустим выполнение:
suspend fun main() = coroutineScope {
val root = Composite("Root")
val branch1 = Composite("Branch1")
val branch2 = Composite("Branch2")
val leaf1 = Leaf("Leaf1")
val leaf2 = Leaf("Leaf2")
val leaf3 = Leaf("Leaf3")
val leaf4 = Leaf("Leaf4")
branch1.add(leaf1)
branch1.add(leaf2)
branch2.add(leaf3)
branch2.add(leaf4)
root.add(branch1)
root.add(branch2)
val result = root.execute()
if (result.isSuccess) {
println("Все операции успешно завершены!")
} else {
println("При выполнении произошли ошибки: ${result.exceptionOrNull()?.message}")
}
}
Ожидаемый вывод:
Композит [Root]: начало выполнения
Композит [Branch1]: начало выполнения
Композит [Branch2]: начало выполнения
Лист [Leaf1]: начало выполнения
Лист [Leaf2]: начало выполнения
Лист [Leaf3]: начало выполнения
Лист [Leaf4]: начало выполнения
Лист [Leaf1]: выполнение завершено
Лист [Leaf2]: выполнение завершено
Композит [Branch1]: выполнение успешно завершено
Лист [Leaf3]: выполнение завершено
Лист [Leaf4]: выполнение завершено
Композит [Branch2]: выполнение успешно завершено
Композит [Root]: выполнение успешно завершено
Все операции успешно завершены!
Добавляем ложку дёгтя: обработка ошибок
Но жизнь не всегда так идеальна. Представим, что один из листов выбрасывает исключение:
class Leaf(private val name: String) : Component() {
override suspend fun execute(): Result {
return try {
println("Лист [$name]: начало выполнения")
if (name == "Leaf3") {
throw RuntimeException("Что-то пошло не так в $name")
}
delay(1000)
println("Лист [$name]: выполнение завершено")
Result.success(Unit)
} catch (e: Exception) {
println("Лист [$name]: ошибка — ${e.message}")
Result.failure(e)
}
}
}
Теперь Leaf3
всегда будет выбрасывать исключение. Запустим код снова.
Композит [Root]: начало выполнения
Композит [Branch1]: начало выполнения
Композит [Branch2]: начало выполнения
Лист [Leaf1]: начало выполнения
Лист [Leaf2]: начало выполнения
Лист [Leaf3]: начало выполнения
Лист [Leaf4]: начало выполнения
Лист [Leaf3]: ошибка — Что-то пошло не так в Leaf3
Лист [Leaf1]: выполнение завершено
Лист [Leaf2]: выполнение завершено
Композит [Branch1]: выполнение успешно завершено
Лист [Leaf4]: выполнение завершено
Композит [Branch2]: обнаружены ошибки
Композит [Root]: обнаружены ошибки
При выполнении произошли ошибки: Ошибки в дочерних компонентах
Ошибка в Leaf3
корректно обрабатывается и приводит к неуспешному завершению родительских композитов.
Ограничение параллелизма
Допустим, у нас тысячи компонентов. Запускать тысячи корутин одновременно — не лучшая идея. Добавим ограничение параллелизма с помощью Semaphore
:
class Composite(private val name: String, private val concurrency: Int = 4) : Component() {
private val children = mutableListOf<Component>()
private val semaphore = Semaphore(concurrency)
// Методы add и remove остаются прежними
override suspend fun execute(): Result = coroutineScope {
println("Композит [$name]: начало выполнения")
val results = children.map { child ->
async {
semaphore.withPermit {
child.execute()
}
}
}.awaitAll()
// Обработка результатов остаётся прежней
}
}
Теперь мы ограничили количество одновременно выполняющихся корутин до concurrency
.
Загрузка данных из сети
Допустим, у нас есть приложение. Оно, бедняга, должно общаться с кучей API, собирать данные и всё это максимально быстро. Одни запросы можно отправлять параллельно, другие — строго по порядку.
Каждый загрузчик выполняет свою задачу: стучится к API, получает ответ, проверяет, не подкинули ли ему 500-й статус, и возвращает результат. Бдуем юзать Ktor Client:
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
sealed class Component {
abstract suspend fun execute(): Result<data>
}
data class Data(val content: String)
class ApiLoader(private val url: String, private val client: HttpClient) : Component() {
override suspend fun execute(): Result<data> {
return try {
println("Загружаем данные с $url...")
val response: String = client.get(url)
println("Данные с $url успешно получены.")
Result.success(Data(response))
} catch (e: Exception) {
println("Ошибка при загрузке $url: ${e.message}")
Result.failure(e)
}
}
}
Мы явно передаём клиент через конструктор. Это удобно: можно переиспользовать его, добавить тайм-ауты или мокнуть в тестах.
Далее уже нужны два типа композитов: один для параллельной загрузки, другой для последовательной. Первый отправляет всех своих загрузчиков работать одновременно. Второй в роли некого охранника заставляет выполнять работу по очереди.
Параллельный композит:
class ParallelComposite(private val name: String) : Component() {
private val children = mutableListOf()
fun add(component: Component) = children.add(component)
override suspend fun execute(): Result<data> = coroutineScope {
println("Композит [$name]: параллельное выполнение начато.")
val results = children.map { child ->
async {
child.execute()
}
}.awaitAll()
val failures = results.filter { it.isFailure }
if (failures.isNotEmpty()) {
val exceptions = failures.mapNotNull { it.exceptionOrNull() }
println("Композит [$name]: обнаружены ошибки.")
Result.failure(CompositeException(exceptions))
} else {
val combinedData = results.map { it.getOrThrow().content }.joinToString("\n")
println("Композит [$name]: все операции успешно завершены.")
Result.success(Data(combinedData))
}
}
}
Последовательный композит:
class SequentialComposite(private val name: String) : Component() {
private val children = mutableListOf()
fun add(component: Component) = children.add(component)
override suspend fun execute(): Result<data> {
println("Композит [$name]: последовательное выполнение начато.")
val combinedContent = StringBuilder()
for (child in children) {
val result = child.execute()
if (result.isFailure) {
println("Композит [$name]: ошибка при выполнении.")
return result
}
combinedContent.append(result.getOrThrow().content).append("\n")
}
println("Композит [$name]: успешно завершено.")
return Result.success(Data(combinedContent.toString()))
}
}
Все готово.
Далее добавим обработку ошибок:
class CompositeException(val errors: List<Throwable>) : Exception(
"Ошибки в композите: ${errors.joinToString { it.message ?: "Неизвестная ошибка" }}"
)
Когда один из загрузчиков облажается (а это может случится), мы соберём все исключения в одном месте и вернём их как результат композита.
Соберём параллельный и последовательный загрузчики в общий композит:
suspend fun main() {
val client = HttpClient(CIO)
// Загрузчики
val loader1 = ApiLoader("https://api.example.com/data1", client)
val loader2 = ApiLoader("https://api.example.com/data2", client)
val loaderError = ApiLoader("https://api.example.com/error", client)
// Параллельный композит
val parallelComposite = ParallelComposite("ParallelGroup")
parallelComposite.add(loader1)
parallelComposite.add(loader2)
// Последовательный композит
val sequentialComposite = SequentialComposite("SequentialGroup")
sequentialComposite.add(parallelComposite)
sequentialComposite.add(loaderError)
// Общий корневой композит
val rootComposite = ParallelComposite("RootGroup")
rootComposite.add(sequentialComposite)
// Выполняем
val result = rootComposite.execute()
if (result.isSuccess) {
println("Все данные успешно загружены:")
println(result.getOrThrow().content)
} else {
println("Ошибки при выполнении:")
val exception = result.exceptionOrNull()
if (exception is CompositeException) {
exception.errors.forEach { println("- ${it.message}") }
} else {
println("- ${exception?.message}")
}
}
client.close()
}
26 ноября в Otus пройдет открытый урок на тему «Макросы и другие помощники Dart/Flutter». На нем участники научатся создавать и использовать макросы, разберут принципы генерации кода через source_gen и build_runner и упростят себе жизнь с помощью mason bricks.
Если тема показалась для вас актуальной, записаться можно на странице курса "Flutter Mobile Developer". А в календаре мероприятий можно посмотреть список всех открытых уроков по другим IT-направлениям.