Всем привет! В своей работе я часто использую Kotlin для автоматизации. Деятельность моя не связана напрямую с программированием, но Котлин здорово упрощает рабочие задачи.
Недавно нужно было собрать данные немаленького размера, дабы сделать анализ, поэтому решил написать небольшой скрипт, для получения данных и сохранения их в Excel. С последним пунктом проблем не возникло - почитал про Apache POI, взял пару примеров из официальной документации, доработав под себя. Чего не скажешь про запросы в Сеть.
Источник отдавал пачками json и надо было как-то быстро эти "пачки" собирать, преобразовывая в текст и записывая в файл таблицу.
Асинхронный метод
Начать решил с простой асинхронщины. Немного поковыряв HttpUrlConnection, отправил туда, где ему и место, заменив на HttpClient из Java.
Для тестов взял сервис https://jsonplaceholder.typicode.com/, который мне подсказал один знакомый разработчик. Сохранил ссылку, которая выдает Json с комментариями в переменную, дабы не дублировать и начал тесты.
const val URL = "https://jsonplaceholder.typicode.com/comments"
Функция была готова и даже работала. Данные приходили.
fun getDataAsync(url: String): String? {
val httpClient = HttpClient.newBuilder()
.build()
val httpRequest = HttpRequest.newBuilder()
.uri(URI.create(link)).build()
return httpClient.sendAsync(httpRequest, BodyHandlers.ofString())
.join().body()
}
Теперь надо было проверить скорость работы. Вооружившись measureTimeMillis я запустил код.
val asyncTime = measureTimeMillis {
val res = (1..10)
.toList()
.map {getDataAsync("$URL/$it")}
res.forEach { println(it) }
}
println("Асинхронный запрос время $asyncTime мс")
Все работало как надо, но хотелось быстрее. Немного покопавшись в Интернете, я набрел на решение, в котором задачи выполняются параллельно.
Parallel Map
Автор в своем блоге пишет, что код ниже выполняется параллельно с использованием корутин. Ну что, я давно хотел их попробовать, а тут представилась возможность.
suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> =
coroutineScope {
map { async { f(it) } }.awaitAll()
}
Если я все верно понял, то здесь расширяется стандартная коллекция (класс Iterable) функцией pmap, в которую передается лямбда. В лямбду поочередно приходит параметр A. Затем после окончания прохода по списку async дожидается выполнения всех элементов списка, и с помощью .awaitAll() выдает результат в виде списка. Причем для каждого элемента функция с модификатором suspend, то есть блокироваться она не будет.
Пришло время тестов, и сказать, что я был разочарован - значит не сказать ничего.
val parmapTime = measureTimeMillis {
runBlocking {
val res = (1..10)
.toList()
.pmap { getDataAsync("$URL/$it") }
println(mapResult)
}
}
println("Время pmap $parmapTime мс")
Средний результат был в районе - 1523мс, что не сильно то отличалось по скорости от первого решения. Задачи может и работали параллельно благодаря map и async, но уж очень медленно.
Parallel Map v 2.0
После работы, вооружившись малиновым чаем, я сел читать документацию по корутинам и через некоторое время переписал реализацию автора.
suspend fun <T, V> Iterable<T>.parMap(func: suspend (T) -> V): Iterable<V> =
coroutineScope {
map { element ->
async(Dispatchers.IO) { func(element) }
}.awaitAll()
}
val parMapTime = measureTimeMillis {
runBlocking {
val res = (1..10)
.toList()
.parMap { getDataAsync("$URL/$it") }
}
println(res)
}
println("Параллельная map время $parMapTime мс")
После добавления контекста Dispatchers.IO задача выполнялась в 2 раза быстрее ~ 610 мс. Другое дело! Остановившись на этом варианте и дописав все до полноценного рабочего скрипта (проверка ошибок, запись в excel и т.д.) я успокоился. Но мысль в голове о том, что можно еще что-то улучшить не покидала меня.
Java ParallelStream
Через несколько дней, в одном из постов на stackowerflow прочитал о parallelStream. Не откладывая дело в долгий ящик, после работы вновь запустил IDEA.
val javaParallelTime = measureTimeMillis {
val res = (1..10).toList()
.parallelStream()
.map { getDataAsync("$URL/$it") }
res.forEach { println(it) }
}
println("Java parallelSrtream время $javaParallelTime мс")
Код выполнялся даже чуть быстрее, чем моя реализация. Но радость длилась ровно до того момента, когда пришло время обрабатывать ошибки. Точки останова насколько я понял в stream нет. Иногда, у меня получалось так, что все считалось до конца, вываливалась ошибка и в виде результата "прилетал" то неполный, то пустой Json.
Может, я делал что-то не так, но с async таких проблем не возникло. Там можно контролировать данные на каждом шаге итерации и удобно обрабатывать ошибки.
Выводы
Результаты можно посмотреть в таблице ниже. Для себя я однозначно решил оставить async await. В основном конечно из-за более простой обработки ошибок. Да и за пределы корутин тут выходить не надо.
Метод | Время (ms) |
---|---|
Асинхронный метод | 1487 |
Реализация pmap из Сети | 1523 |
Мой вариант - parallelMap | 610 |
Java.parallelStream | 578 |
getMyData (with @zeldigas) | ~ 498 |
В дальнейшем, есть мысли оформить это в небольшую библиотеку и использовать в личных целях, и конечно переписать все это с "индусского кода" на человеческий, на сколько хватит возможностей. А потом залить все это на vds.
Надеюсь мой опыт кому-нибудь пригодится. Буду рад конструктивной критике и советам! Всем спасибо
P.S.
Спасибо товарищу @zeldigas за подсказку насчет HttpClient! Как оказалось, вызов join() блокирует выполнение. Он предложил подключить kotlinx-coroutines-jdk8 с экстеншенами для CompletableFuture, но я сделал через suspendCoroutine и результат получился аналогичным.
suspend fun getMyData(httpClient: HttpClient, url: String): String? =
suspendCoroutine {
val httpRequest = HttpRequest.newBuilder().uri(URI.create(url)).build()
httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
.thenApply { obj -> it.resume(obj.body()) }
}
Наша suspendCoroutine приостанавливает выполнение корутины, дожидается результата и принимает его в методе resume.
val httpClient:HttpClient = HttpClient.newBuilder().build()
val result = measureTimeMillis {
runBlocking {
val res =(1..10).toList()
.map { async { getMyData(httpClient, "$URL/$it") } }
.awaitAll()
println(res)
}
}
println("Time for requests: $result")
В итоге, результат получился в районе 498 мс, что быстрее чем остальные варианты. Этот случай еще раз доказывает, что комментарии на Хабре порой очень полезны.
Всем спасибо! Всем пока!
UbuRus
Если async, то как же дождаться результата?
Arpanet256
А причем тут async? Или вы про — getDataAsync у автора?
UbuRus
Именно, суффикс асинк подразумевает что метод вернул какую-нибудь CompletableFuture, но нету вообще намека на то что это так. Код стал бы куда сложнее для чтения и понимания.
А если там синхронный клиент, то очень легко сделать так чтобы полностью асинхронная версия с корутинами на большем колличестве запросов вырвалась вперед (опять же зависит как клиент настроить, может там в клиенте 1 req/host установлен и тогда хоть 1000 тредов создай, 0 пользы)
Arpanet256
Если я верно понял из приведенного кода, то там HttpClient из 11 джавы, и он запрос посылает асинхронно — httpClient.sendAsync. Может он просто назвал так метод, потому что там в клиенте — sendAsync, а не просто send?
В общем тут у автора надо спросить.
Мне кажется быстрее чем async await не выйдет сделать. Или можно потоков кучу создать с синхронным клиентом?
UbuRus
Окей, запрос асинхронно но в итоге тред из CommonPool из ForkJoinPool блочится тогда. Добавляем еще запросов и оп, магический parallel stream перестал скейлиться.
Так что тесты так себе, как и идея сравнивать производительность относительно 3rd-party сервиса в интрнете.
Лучше локально поднять сервис и ввести исскуственную latency
Arpanet256
А почему тред из CommonPool из ForkJoinPool блочится? Разве при указании Dispatchers.IO тредами не рулит корутины, выделяя столько, сколько надо?
Не, можно конечно указать отдельно пул потоков, но будет ли толк?
А какое тогда более лучшее решение по параллельному получению данных есть?
UbuRus
А как это может работать без блокирования треда?) Ну кроме loom, но тут речи про него вообще не было
foxcode85 Автор
Я конечно могу поднять локальный сервер, например на ktor, но зачем? Мои проблемы код решил, и работает все относительно быстро в сравнении с первым решением или каким-нибудь HttpUrlConnection и без корутин. Да я не разработчик в принципе.
Но из-за спортивного интереса да, можно на локальном потестить. Думаю что как будет свободное время — поковыряюсь с этим.
UbuRus
Я просто никак не могу быть согласным что стримы быстрее пусть и на 100мс.
Вот я накидал кейс когда они будут намного медленнее:
Server.kt
Client1.kt
Client2.kt
Корутины (client2) выполнятся за ожидаемые 20секунд(10секунд, 1000запросов, 500 одновременных запросов ограничение клиента, у меня получилось 20730мс если точно), они упираются только в сервер и колличество сокетов.
Java вариант будет выполняться вечность (1000 запросов / 8 тредов * 10с = 20 минут, я даже подожду чтобы получить реальную цифру, подождал – 20 минут), он упрется в CommonPool (параллелизм в моем случае 8, по числу тредов), потому что не нужно использовать Stream для задач с IO, ну не нужно и все. Даже в вашем тесте попробуйте увеличить размер дата-сета который обрабатываете, и Dispatchers.IO в разы лучше справится чем стримы. Стримы в джаве это фреймворк для математических задач, но из-за отсутсвия нормальных коллекций стал использоваться как замена адекватным коллекциям.
foxcode85 Автор
Так я и не спорю — остановился на async await же. ParallelStream привел для сравнения и своего интереса ради. Кстати, иногда он был таким же по скорости, иногда больше на 30-70 мс. Это простите меня «фигня» по сравнению с удобством обработки ошибок корутин.