RxJava как базука, большинство приложений не использует и половины её огневой мощи. В статье пойдет речь о том, как заменить её корутинами (сопрограммами) Kotlin.
Я работал с RxJava в течении нескольких лет. Это определенно одна из лучших библиотек для любого Android проекта, которая и сегодня в ударе, особенно, если вы программируете на Java. Если же вы используете Kotlin, то можно сказать, что в городе новый шериф.
Большинство использует RxJava только для того, чтобы контролировать потоки и для предотвращения callback hell (если вы не знаете, что это такое, считайте себя счастливчиком и вот почему). Дело в том, что мы должны иметь ввиду, что реальная мощь RxJava — это реактивное программирование и backpressure. Если вы используете её для контроля асинхронных запросов, вы используете базуку, чтобы убить паука. Она будет делать свою работу, но это перебор.
Одним заметным недостатком RxJava является количество методов. Оно огромно и имеет тенденцию расползаться по всему коду. В Kotlin вы можете использовать корутины для реализации большей части поведения, которое вы ранее создавали, используя RxJava.
Но… что такое корутины?
Корутин — это способ обработки конкурентных задач в потоке. Поток будет работать пока не остановлен и контекст будет меняться для каждого корутина без создания нового потока.
Корутины в Kotlin всё еще являются эксперементальными, но они вошли в Kotlin 1.3, так что я написал ниже новый класс UseCase (для clean architecture), использующий их. В этом примере, вызов корутин инкапсулирован в одном файле. Таким образом, другие слои не будут зависеть от выполняемых сопрограмм, обеспечивая более разъединенную архитектуру.
/**
* (C) Copyright 2018 Paulo Vitor Sato Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.psato.devcamp.interactor.usecase
import android.util.Log
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.android.UI
import kotlin.coroutines.experimental.CoroutineContext
/**
* Abstract class for a Use Case (Interactor in terms of Clean Architecture).
* This interface represents a execution unit for different use cases (this means any use case
* in the application should implement this contract).
* <p>
* By convention each UseCase implementation will return the result using a coroutine
* that will execute its job in a background thread and will post the result in the UI thread.
*/
abstract class UseCase<T> {
protected var parentJob: Job = Job()
//var backgroundContext: CoroutineContext = IO
var backgroundContext: CoroutineContext = CommonPool
var foregroundContext: CoroutineContext = UI
protected abstract suspend fun executeOnBackground(): T
fun execute(onComplete: (T) -> Unit, onError: (Throwable) -> Unit) {
parentJob.cancel()
parentJob = Job()
launch(foregroundContext, parent = parentJob) {
try {
val result = withContext(backgroundContext) {
executeOnBackground()
}
onComplete.invoke(result)
} catch (e: CancellationException) {
Log.d("UseCase", "canceled by user")
} catch (e: Exception) {
onError(e)
}
}
}
protected suspend fun <X> background(context: CoroutineContext = backgroundContext, block: suspend () -> X): Deferred<X> {
return async(context, parent = parentJob) {
block.invoke()
}
}
fun unsubscribe() {
parentJob.cancel()
}
}
Прежде всего, я создал родительскую задачу. Это ключ для отмены всех корутинов, которые были созданы в классе UseCase. Когда мы вызываем выполнение, важно, чтобы старые задачи были отменены, чтобы быть уверенным, что мы не упустили ни одной сопрограммы (это также произойдет, если мы отпишемся от данного UseCase).
Также, я вызываю запуск (UI). Это означает, что я хочу создать корутин, который будет выполняться в UI потоке. После этого, я вызываю фоновый метод, который создает async в CommonPool (этот подход, на самом деле, будет иметь плохую производительность). В свою очередь, async возвратит Deffered, а далее, я вызову его метод ожидания. Он ждет завершения фонового корутина, который принесет результат или ошибку.
Это может быть использовано для реализации большинства из всего того, что мы делали с RxJava. Ниже, немного примеров.
Map
Я скачал результаты searchShow и изменил их для возврата наименования первого шоу.
Код на RxJava:
public class SearchShows extends UseCase {
private ShowRepository showRepository;
private ResourceRepository resourceRepository;
private String query;
@Inject
public SearchShows(ShowRepository showRepository, ResourceRepository resourceRepository) {
this.showRepository = showRepository;
this.resourceRepository = resourceRepository;
}
public void setQuery(String query) {
this.query = query;
}
@Override
protected Single<String> buildUseCaseObservable() {
return showRepository.searchShow(query).map(showInfos -> {
if (showInfos != null && !showInfos.isEmpty()
&& showInfos.get(0).getShow() != null) {
return showInfos.get(0).getShow().getTitle();
} else {
return resourceRepository.getNotFoundShow();
}
});
}
}
Код на корутинах:
class SearchShows @Inject
constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) :
UseCase<String>() {
var query: String? = null
override suspend fun executeOnBackground(): String {
query?.let {
val showsInfo = showRepository.searchShow(it)
val showName: String? = showsInfo?.getOrNull(0)?.show?.title
return showName ?: resourceRepository.notFoundShow
}
return ""
}
}
ZIP
Zip возьмет две эмиссии от Observer и положит их вместе в новую эмиссию. Обратите внимание, что с RxJava вы должны указать выполнить вызов в параллельном использовании subscribeOn в каждом Single. Мы хотим получить оба в одно время и возвратить их вместе.
Код на RxJava:
public class ShowDetail extends UseCase {
private ShowRepository showRepository;
private String id;
@Inject
public SearchShows(ShowRepository showRepository) {
this.showRepository = showRepository;
}
public void setId(String id) {
this.id = id;
}
@Override
protected Single<Show> buildUseCaseObservable() {
Single<ShowDetail> singleDetail = showRepository.showDetail(id).subscribeOn(Schedulers.io());
Single<ShowBanner> singleBanner = showRepository.showBanner(id).subscribeOn(Schedulers.io());
return Single.zip(singleDetail, singleBanner, (detail, banner) -> new Show(detail,banner));
}
Код на корутинах:
class SearchShows @Inject
constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) :
UseCase<Show>() {
var id: String? = null
override suspend fun executeOnBackground(): Show {
id?.let {
val showDetail = background{
showRepository.showDetail(it)
}
val showBanner = background{
showRepository.showBanner(it)
}
return Show(showDetail.await(), showBanner.await())
}
return Show()
}
}
FlatMap
В этом случае, я ищу шоу которые имеют строку запроса и для каждого результата (ограниченно 200 результатами), я получаю также рейтинг шоу. В конце, я возвращаю список шоу с соответствующими рейтингами.
Код на RxJava:
public class SearchShows extends UseCase {
private ShowRepository showRepository;
private String query;
@Inject
public SearchShows(ShowRepository showRepository) {
this.showRepository = showRepository;
}
public void setQuery(String query) {
this.query = query;
}
@Override
protected Single<List<ShowResponse>> buildUseCaseObservable() {
return showRepository.searchShow(query).flatMapPublisher(
(Function<List<ShowInfo>, Flowable<ShowInfo>>) Flowable::fromIterable)
.flatMapSingle((Function<ShowInfo, SingleSource<ShowResponse>>)
showInfo -> showRepository.showRating(showInfo.getShow().getIds().getTrakt())
.map(rating -> new ShowResponse(showInfo.getShow().getTitle(), rating
.getRating())).subscribeOn(Schedulers.io()),
false, 4).toList();
}
}
Код на корутинах:
class SearchShows @Inject
constructor(private val showRepository: ShowRepository) :
UseCase<List<ShowResponse>>() {
var query: String? = null
override suspend fun executeOnBackground(): List<ShowResponse> {
query?.let { query ->
return showRepository.searchShow(query).map {
background {
val rating: Rating = showRepository.showRating(it.show!!.ids!!.trakt!!)
ShowResponse(it.show.title!!, rating.rating)
}
}.map {
it.await()
}
}
return arrayListOf()
}
}
Разрешите мне объяснить. Используя RxJava, моё хранилище возвращает одиночную эмиссию List, таким образом мне требуется несколько эмиссий, по одной на каждую ShowInfo. Чтобы сделать это, я вызвал flatMapPublisher. Для каждой эмиссии я должен выделить ShowResponse, и в конце собрать их все в список.
Мы заканчиваем с такой конструкцией: List foreach > (ShowInfo > ShowRating > ShowResponse) > List.
С корутинами я сделал map для каждого элемента List, чтобы конвертировать его в список List<Deffered>.
Как вы можете видеть, большая часть того, что мы сделали с RxJava проще реализовать с синхронными вызовами. Корутины могут обрабатывать даже flatMap, которые, как я верю, одни из самых сложных функций в RxJava.
Хорошо известно, что корутины могут быть легковесными (здесь пример), но результаты озадачили меня. В этом примере, RxJava запускалась примерно 3.1 секунды, тогда как корутины потребовали примерно 5.8 секунд для запуска на CommonPool.
Эти результаты поставили передо мной вопрос, что в них может быть что-то неуместное. Позже, я нашел, что это. Я использовал retrofit Call, который блокировал поток.
Для исправления данной ошибки есть два пути, выбор зависит от того, какой версией Android Studio вы пользуетесь. В версии Android Studio 3.1 мы должны убедиться, что мы не блокируем фоновый поток. Для этого, я использовал данную библиотеку:
implementation 'ru.gildor.coroutines:kotlin-coroutines-retrofit:0.12.0'
Данный код создает расширение функции retrofit Call для приостановки потока:
public suspend fun <T : Any> Call<T>.await(): T {
return suspendCancellableCoroutine { continuation ->
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>?, response: Response<T?>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
continuation.resumeWithException(
NullPointerException("Response body is null: $response")
)
} else {
continuation.resume(body)
}
} else {
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<T>, t: Throwable) {
// Don't bother with resuming the continuation if it is already cancelled.
if (continuation.isCancelled) return
continuation.resumeWithException(t)
}
})
registerOnCompletion(continuation)
}
}
В Android Studio 3.2, вы можете обновить библиотеку корутин до версии 0.25.0. Эта версия имеет CoroutineContext IO (вы можете видеть соответствующий комментарий в моем классе UseCase).
Выполнение на CommonPool без блокирующего вызова заняло 2.3 секунды и 2.4 секунды с IO и блокирующими вызовами.
Я надеюсь данная статья вдохновит вас на использование корутин, более легковесной и, возможно, более быстрой альтернативе RxJava и немного облегчит понимание того, что вы пишете синхронизированный код который выполняется асинхронно.
Комментарии (8)
vba
31.08.2018 09:54А как насчет back pressure в корутинах?
andrekotishe Автор
31.08.2018 20:04Корутины умеют приостанавливаться и буферизовать элемент потока до тех пор, пока предыдущий элемент не будет обработан, поэтому они автоматически решают проблему backpressure. Стоит глянуть вот этот пример, в котором это наглядно показано.
kapmayn
31.08.2018 20:10я понимаю, что хочется показать, как корутины круты, но делать так, как это делаете вы, явно не стоит: почему во всех примерах с rxjava у вас конструктор и set прописаны отдельно, хотя для корутин всё сделано адекватно? это же делает код раза в два больше
p.s. можно не отклонять мой комментарий, если ответа нет, а просто исправить ваши примерыandrekotishe Автор
31.08.2018 20:11Я верю, что код примеров можно улучшить, но в данном случае, это перевод и мной сохранена стилистика автора.
Mishkun
01.09.2018 21:22Автор — "Использование rxJava для вызова асинхронных методов — оверхед"
Тоже автор — "Тут я написал базовый класс для UseCase"
imanushin
В корутинах есть важное преимущество перед операторами в RxJava/Reactor: все операторы могут быть асинхронными. Т.е. можно написать что-то вроде:
Если
loadRemoteData
асинхронный, то можно делать неблокирующий pipe.Однако есть еще важное отличие тех же корутин от RxJava/Reactor: корутины создаются "горячими" (сравнение горячих и холодных подписок). То есть когда вы возвращаете
produce { ... }
, вы можете начать публиковать данные до того, как на канал подпишутся.В отличии от Kotlin Channels, в классических холодных Observable вы возвращаете класс, на который:
dmdev
В RxJava для этих целей есть Connectable Observable и Subject