О том, как удобно писать на Rx, какие операторы и сущности в нём есть, как делать запросы в сеть и работать с многопоточностью, не писал разве что ленивый. Кто-то рассказывает, что можно «обмазаться» Rx-ом на всех уровнях архитектуры и приложение станет реактивным, а разработка — в разы быстрее. Но как только слова доходят до дела, то встаёт множество вопросов, как подружить Rx со спецификой UI и системными API и зачем нужен Rx, кроме как для многопоточности. В этой статье я хотел бы рассказать о нашем видении разработки в реактивном стиле и на реальных примерах показать, как он упрощает жизнь.


Наверное, многие видели доклад Джейка Вортона про RxJava2. Всем, кто не смотрел, категорически советую. В нём не столь важно описание того, что умеет RxJava и какие объекты там есть. В этом докладе Вортон показал проблему нереактивных подходов к разработке — неочевидность потоков обработки данных, отсутствие чёткого и однонаправленного пути, по которому идут события, начиная от пользовательского ввода и заканчивая изменением интерфейса, которое повлекли действия пользователя.


И эта проблема очень сильно прослеживается в MVP. То, что Presenter и View имеют ссылки друг на друга, ставит крест на однонаправленности данных; начинаются споры о том, насколько View должна быть пассивной, а в это время дебаггером приходится гулять по коду от одного метода к другому, чтобы отследить цепочку действий. Эту проблему начинают решать MVVM и подобные подходы, и мы пошли по их пути.


Команда Android-разработки FunCorp вдохновилась докладом Вортона и решила попробовать написать абсолютно всё на Rx. Изначально такой цели не было, но по ходу дела мы поняли, что использование реактивного подхода в тех местах, где он кажется очень странным, сильно упрощает жизнь и делает код очевиднее.


Первое, что мы сделали, это подключили RxBinding — теперь наши View умеют в Rx. Это начало любого действия в приложении, то есть начало цепочки обработки данных.


implementation "com.jakewharton.rxbinding3:rxbinding-core:$rx_binding_version"

Мы решили, что View будет максимально пассивной и не будет самостоятельно инициировать никаких действий.


У View есть два типа методов:


  • изменения какого-либо свойства;
  • наблюдатель на событие пользовательского ввода.

class AuthFullRegistrationView @Inject constructor(): BaseArchView() {

   fun doneClick(): Observable<Unit> = viewHolder.doneBtn.clicks()
   fun loginClick(): Observable<Unit> = viewHolder.loginBtn.clicks()
   fun nicknameText(): Observable<CharSequence> = viewHolder.nickname.textChanges()
   fun passwordText(): Observable<CharSequence> = viewHolder.password.textChanges()
   fun emailText(): Observable<CharSequence> = viewHolder.email.textChanges()

   fun setNickname(nickname: String?) {
       viewHolder.nickname.setText(nickname)
   }

   fun setNicknameError(enabled: Boolean, text: String? = null) {
       viewHolder.nicknameRegistrationView.error = text
       viewHolder.nicknameRegistrationView.isErrorEnabled = enabled
   }

   fun setEmailError(enabled: Boolean, text: String? = null) {
       viewHolder.emailRegistrationView.error = text
       viewHolder.emailRegistrationView.isErrorEnabled = enabled
   }

   fun setPasswordError(enabled: Boolean, text: String? = null) {
       viewHolder.passwordRegistrationView.error = text
       viewHolder.passwordRegistrationView.isErrorEnabled = enabled
   }
}



Такие View становятся максимально переиспользуемыми, так как ни от кого не зависят (чего нельзя сказать о View в MVP).


Бизнес-логика и пользовательские действия соединяются в Presenter. Presenter подписывает изменение доменной модели на события от View. В onNext/OnError View обновляется исходя из итогового состояния модели. В самом простом варианте это выглядит так:


archView.loginClick()
   .subscribe { authNavigationController.goToLogin(LoginParams()) }

Сразу встаёт вопрос о том, как отписывать все эти цепочки действий при уходе с экрана. Он решается через CompositeDisposable и экстеншен-функцию.


abstract class SimpleArchPresenter<V: ArchView> {

   var args: Any? = null
   var state: Bundle? = null

   val compositeDisposable = CompositeDisposable()

   @CallSuper
   open fun bindIntents(archView: V) {
   }

   @CallSuper
   open fun unbindIntents() {
       compositeDisposable.clear()
   }
}

fun Disposable.addToDisposable(compositeDisposable: CompositeDisposable): Disposable {
   compositeDisposable.add(this)
   return this
}

Теперь наша цепочка приобретает следующий вид:


archView.loginClick()
   .subscribe { authNavigationController.goToLogin(LoginParams()) }
   .addToDisposable(compositeDisposable)

Появляются более сложные сценарии обработки. Например, регистрация пользователя:


  • клик по кнопке Sign in;
  • считываем имя пользователя и пароль из полей ввода
  • запрос к API;
  • в случае успеха переходим на следующий экран или показываем ошибку.

private fun bindRegistrationFlow(archView: AuthFullRegistrationView) {
   val handleFields = Observable
       .zip(archView.nicknameText(), archView.emailText(), archView.passwordText(),
           Function3{ nicknameText: CharSequence, emailText: CharSequence, passwordText: CharSequence ->
               FullRegistrationInteractor.RegisterFields(nicknameText, passwordText, emailText)
           })
       .take(1)

   archView.doneClick()
       .flatMap { handleFields }
       .flatMap {
           fullRegistrationInteractor.registration(it)
                   .subscribeOn(Schedulers.io())
       }
       .safeResponseSubscribe({ authNavigationController.goToVerifyNewEmail() },
                              { handleError(archView, it) })
       .addToDisposable(compositeDisposable)
}

Далее мы понимаем, что пока идёт запрос в сеть, нам нужно показать индикатор прогресса. В классическом варианте показ и скрытие диалога находятся в разных местах и разбивают цепочку действий.


У нас есть следующий экстеншен:


/**
* if we hide progress with delay and after that this observable is completed, doOnDispose not
* called and we dont reset delayed progress
*/
fun <E, T> Observable<E>.withProgress(progress: ProgressDelegate, action: (data: E) -> Observable<T>): Observable<T> {
  return this.flatMap {
     action.invoke(it)
           .doOnSubscribe { progress.show() }
           .observeOn(AndroidSchedulers.mainThread())
           .doFinally {
              progress.hide()
           }
  }
}

interface ProgressDelegate {

  fun show()
  fun hide()
}

Внутрь функции передаётся Observable, который оборачивается в показ прогресса на onSubscribe и его скрытие на onFinally.


Дефолтная реализация прогресса представлена ниже.


ProgressDialogManager
class ProgressDialogManager constructor(activity: Activity, defaultMessage: CharSequence) : ProgressDelegate {

  private val progressDialog = ProgressDialog(activity)

  init {
     progressDialog.setCancelable(false)
     progressDialog.setMessage(defaultMessage)
  }

  override fun show() {
     if (progressDialog.isShowing) return
     progressDialog.show()
  }

  override fun hide() {
     if (!progressDialog.isShowing) return
     progressDialog.dismiss()
  }
}



Но и также есть реализации ProgressDelegate, добавляющие в RecycleView в конец списка элемент с прогрессом:


private class StatisticsServersListProgressDelegate(private val statisticsServersAdapter: StatisticsServersAdapter) : ProgressDelegate {

  override fun show() {
     with(statisticsServersAdapter) {
        transaction {
           clear()
           add(ArchAdapterItem(PROGRESS_VIEW_TYPE.toString(),
                               SimpleProgressItemData(isFooter = false),
                               PROGRESS_VIEW_TYPE))
        }
     }
  }

  override fun hide() {
     statisticsServersAdapter.removeItem(PROGRESS_VIEW_TYPE.toString())
  }
}


Или, например, экстеншен, добавляющий задержку на показ любой реализации ProgressDelegate, позволяющий убрать мелькания диалогов на коротких действиях.


DelayedProgressDelegate
class DelayedProgressDelegate internal constructor(private val progressDelegate: ProgressDelegate,
                                                  private val delayTimeMillis: Long = SHOW_DELAY_MILLIS) : ProgressDelegate {

  companion object {
     private const val SHOW_DELAY_MILLIS = 800L
  }

  enum class ProgressState {
     SHOW,
     HIDE
  }

  private val stateChangeListener = object : StateMachine.StateChangeListener<ProgressState> {
     override fun onStateChanged(oldState: ProgressState, newState: ProgressState) {
        when (newState) {
           ProgressState.SHOW -> progressDelegate.show()
           ProgressState.HIDE -> progressDelegate.hide()
        }
     }
  }

  private val stateMachine = StateMachine(ProgressState.HIDE, stateChangeListener)

  override fun show() {
     stateMachine.gotoState(ProgressState.SHOW, delayTimeMillis, true)
  }

  override fun hide() {
     stateMachine.gotoState(ProgressState.HIDE, 0, true)
  }

  fun reset() {
     stateMachine.clear()
  }
}

Этот экстеншен можно использовать с абсолютно любыми реализациями диалогов.


В итоге вышеописанный пример принимает следующий вид:


private fun bindRegistrationFlow(archView: AuthFullRegistrationView) {
   val handleFields = Observable
       .zip(archView.nicknameText(), archView.emailText(), archView.passwordText(),
           Function3{ nicknameText: CharSequence, emailText: CharSequence, passwordText: CharSequence ->
               FullRegistrationInteractor.RegisterFields(nicknameText, passwordText, emailText)
           })
       .take(1)

   archView.doneClick()
       .flatMap { handleFields }
       .withProgress(progressDialogManager.delayed())  {  // flatMap -> withProgress
           fullRegistrationInteractor.registration(it)
                   .subscribeOn(Schedulers.io())
       }
       .safeResponseSubscribe({ authNavigationController.goToVerifyNewEmail(VerifyEmailParams(archView.getEmailText())) },
                              { handleError(archView, it) })
       .addToDisposable(compositeDisposable)
}

В этот момент мы поняли, что любые диалоги проще показывать через Rx.


fun createSimpleDialog(@StringRes messageId: Int, @StringRes positiveTitle: Int, @StringRes negativeTitle: Int,
                      postCreateDialogAction: (dialog: AlertDialog) -> Unit = {}): Observable<Boolean> {
  val result = BehaviorSubject.create<Boolean>()
  val dialog = AlertDialog.Builder(activity, styleId)
        .setMessage(messageId)
        .setPositiveButton(positiveTitle) { _, _ -> result.onNext(true) }
        .setNegativeButton(negativeTitle) { _, _ -> result.onNext(false) }
        .create()
  dialog.setOnDismissListener { result.onComplete() }
  return result.doOnSubscribe {
     dialog.show()
     postCreateDialogAction.invoke(dialog)
  }
        .subscribeOn(AndroidSchedulers.mainThread())
        .doOnDispose { dialog.dismiss() }
}

Вся цепочка удаления сервера принимает следующий вид:


private fun bindDeleteMenuItem(archView: CurrentServerArchView, server: ServerEntity) {
   archView.serverMenuDeleteClicks()
           .concatMap {
               alertDialogRxFactory.createSimpleDialog(R.string.delete_server_alert_message,
                                                       R.string.common_yes,
                                                       R.string.common_no)
           }
           .filter { it }
           .withProgress(progressDialogManager.delayed()) {
               deleteServerInteractor.deleteServer(server.id)
                       .subscribeOn(Schedulers.io())
           }
           .safeResponseSubscribe(onError = {
               errorDialogProvider.showModelError(it)
           })
           .addToDisposable(serverActions)
}

archView.serverMenuDeleteClicks — это клик по элементу меню, который тоже обрабатывается реактивно.


ActionMenuArchView
class ActionMenuArchView @Inject constructor(private val activity: Activity) : BaseArchView() {

  val items: List<MenuItem> get() = menuView.menu.items()
  val menuClicks: Observable<MenuItem> by lazy { menuView.menuClicks().share() }

  private val menuView: ActionMenuView get() = viewHolder.view as ActionMenuView

  fun inflateMenu(@MenuRes menu: Int): List<MenuItem> {
     menuView.menu.clear()
     activity.menuInflater.inflate(menu, menuView.menu)
     return items
  }

  fun itemClicks(@IdRes itemId: Int): Observable<MenuItem> =
        menuClicks.filter { it.itemId == itemId }
}

Для примера, показ диалогов с календарём:


fun openCalendar(activity: Activity): Observable<Date> {
   val result = BehaviorSubject.create<Date>()
   val dateCalendar = Calendar.getInstance()
   dateCalendar.timeInMillis = System.currentTimeMillis()

   val dialog = DatePickerDialog(activity,
       DatePickerDialog.OnDateSetListener { _, year, month, dayOfMonth ->
           val calendar = Calendar.getInstance()
           calendar.set(year, month, dayOfMonth)
           result.onNext(calendar.time)
           result.onComplete()
       },dateCalendar.get(Calendar.YEAR),
       dateCalendar.get(Calendar.MONTH),
       dateCalendar.get(Calendar.DAY_OF_MONTH)
   )
   dialog.setOnDismissListener { result.onComplete() }
   dialog.show()
   return result.doOnDispose { dialog.dismiss() }
}

Или выбором пола:


fun openSexChooser(activity: Activity): Observable<String> {
   val result = BehaviorSubject.create<String>()
   val items = arrayOf("Male", "Female")
   val builder = AlertDialog.Builder(activity)
   builder.setItems(items) { _, which ->
       result.onNext(items[which])
       result.onComplete()
   }

   val alertDialog = builder.create()
   alertDialog.setOnDismissListener { result.onComplete() }
   alertDialog.show()
   return result.doOnDispose { alertDialog.dismiss() }
}

Правильнее показывать диалог в doOnSubscribe, но никто не идеален:)

Почему это очень удобно?


  • диалог полностью отвязывается от жизненного цикла;
  • диалог полностью отвязывается от действий, которые он инициирует, и его можно легко переиспользовать;
  • диалог остаётся частью цепочки действий, а не разбивает её на «до» и «после» и не теряется контекст действия;
  • как результат предыдущего пункта — не падает читаемость кода.

Жизненный цикл


Далее можно обернуть вызовы методов жизненного цикла. Зачем? Например, чтобы повторить поведение, аналогичное LiveData, т.е. чтобы не обновлять UI между onPause и onResume.


@ActivityScope
class ActivityLifecycleDispatcher @Inject constructor() {
   private val isResumedSubject = BehaviorSubject.create<Boolean>()

   fun isResumed(): Observable<Boolean> = isResumedSubject

   fun onResumed(isResumed: Boolean) {
       isResumedSubject.onNext(isResumed)
   }
}

private fun startServers() {
  updateServersDisposable?.dispose()
  updateServersDisposable = visibleScreen()
        .flatMap { serversInteractor.load().repeatWhen { it.delay(UPDATING_PERIOD_SECONDS, TimeUnit.SECONDS) } }
        .subscribeOn(Schedulers.io())
        .subscribe()
}

private fun visibleScreen() = activityLifecycleDispatcher.isResumed().filter { it }.take(1)

Здесь мы периодически обновляем список серверов, но только если экран виден пользователю.


Таким же образом можно обрабатывать onNewIntent.


@ActivityScope
class ActivityResultDispatcher @Inject constructor() {
   private val onActivityResultSubject = PublishSubject.create<OnActivityResultInfo>()

   fun onActivityResult(requestCode: Int): Observable<OnActivityResultInfo> = onActivityResultSubject.filter { it.requestCode == requestCode }

   fun handleOnActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
       onActivityResultSubject.onNext(OnActivityResultInfo(requestCode, resultCode, data))
   }
}

data class OnActivityResultInfo constructor(val requestCode: Int, val resultCode: Int, val data: Intent?)

Например, цепочка действий при возврате в галерею после создания нового канала:


activityResultDispatcher.onActivityResult(ServerChannelsListPresenter.CREATE_CHANNEL_REQUEST_CODE)
     .filter { it.resultCode == Activity.RESULT_OK }
     .map { it.data!!.extras!!.revealNavigationParams<ChannelSettingsFinishParams>()!! }
     .flatMap {
        getChannelInteractor.get(it.channelId)
              .take(1)
              .subscribeOn(Schedulers.io())
     }
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe {
        if (it.data != null) {
           galleryNavigator.openChannel(it.requireData)
           mainMenuVisibilityController.closeImmediately()
        }
     }
     .addToDisposable(compositeDisposable)

  • получили необходимый onActivityResult;
  • развернули параметры из интента;
  • загрузили ожидаемый канал;
  • на главном потоке открыли этот канал и скрыли меню.

Аналогично мы обрабатываем запросы разрешений, onNewIntent, остальные методы жизненного цикла, появление клавиатуры, клик по клавише громкости и прочие системные события.


Для обработки ошибок сети можно написать механизм повторения запросов при ошибках либо ожидания сети:


fun <T> Observable<SafeResponse<T>>.repeatOnNetwork(networkController: NetworkController): Observable<SafeResponse<T>> {
    return this
        .flatMap {
            if (it.exception is ServerException) {
                Observable.just(it)
            }
            Observable.error<SafeResponse<T>>(Exception())
        }
        .retryWhen {
            it.flatMap { networkController.networkState }
        }
}

Работа с плеером


RxBinding не поддерживает методы ExoPlayer, поэтому пришлось написать пару десятков экстеншенов, чтобы поддержать работу с плеером в таком же стиле:


archView.renderFirstFrame()
     .take(1)
     .subscribe { trackViewed() }
     .addToDisposable(contentDisposable)

archView.playWhenReadyState()
     .subscribe { onChangePlayWhenReadyState(it) }
     .addToDisposable(contentDisposable)

archView.videoSize()
     .take(1)
     .subscribe { archView.setSize(it.height, it.width) }
     .addToDisposable(contentDisposable)

archView.playerState()
     .distinctUntilChanged()
     .subscribe { onChangePlayerState(it, archView) }
     .addToDisposable(contentDisposable)

SimpleExoPlayer.renderFirstFrame
fun SimpleExoPlayer.renderFirstFrame(): Observable<Unit> = ExoPlayerRenderFirstFrameObservable(this)

private class ExoPlayerRenderFirstFrameObservable(private val view: SimpleExoPlayer) : Observable<Unit>() {

   override fun subscribeActual(observer: Observer<in Unit>) {
       val listener = Listener(observer)

       val disposable = object : MainThreadDisposable() {
           override fun onDispose() {
               view.removeVideoListener(listener)
           }
       }
       observer.onSubscribe(disposable)
       view.addVideoListener(listener)
   }

   private class Listener(private val observer: Observer<in Unit>) : VideoListener {
       override fun onRenderedFirstFrame() {
           observer.onNext(Unit)
       }
   }
}

Аналогично мы написали все остальные экстеншены для ExoPlayer.


Работа с RecycleView


Адаптер, который мы используем у себя в проектах:


ArchRecyclerViewAdapter
abstract class ArchRecyclerViewAdapter constructor(diffExecutor: Executor,
                                                  private val componentsFactory: ArchRecycleComponentsFactory) : RecyclerView.Adapter<ArchRecycleViewHolder>() {

  /** this items are update after diff is applied. They may be used when you items adapter is currently showing */
  val adapterItems: List<ArchAdapterItem<out Any>> get() = differ.currentList
  /** this items are updated on [update] call. They represent the future state of this adapter and may be used as a cache for further updates*/
  var dataItems: List<ArchAdapterItem<out Any>> = emptyList()
     private set

  private val differ = AsyncListDifferFactory(this, diffExecutor).create()

  fun update(newList: List<ArchAdapterItem<out Any>>?) {
     dataItems = newList ?: emptyList()
     differ.submitList(newList)
  }

  @Suppress("unused")
  fun transaction(transaction: MutableList<ArchAdapterItem<out Any>>.() -> Unit) {
     update(dataItems.toMutableList().apply(transaction))
  }

  override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ArchRecycleViewHolder {
     val view = componentsFactory.inflateView(viewType, parent)
     return componentsFactory.createViewHolder(viewType, view)!!
  }

  override fun onBindViewHolder(holder: ArchRecycleViewHolder, position: Int) {
     getBinder(holder).bind(holder, adapterItems[position].data)
  }

  override fun onViewRecycled(holder: ArchRecycleViewHolder) {
     super.onViewRecycled(holder)
     getBinder(holder).unbind(holder)
  }

  override fun onViewDetachedFromWindow(holder: ArchRecycleViewHolder) {
     super.onViewDetachedFromWindow(holder)
     getBinder(holder).detach(holder)
  }

  override fun onViewAttachedToWindow(holder: ArchRecycleViewHolder) {
     super.onViewAttachedToWindow(holder)
     getBinder(holder).attach(holder)
  }

  override fun getItemViewType(position: Int) = adapterItems[position].viewType
  override fun getItemCount() = adapterItems.size

  private fun getBinder(holder: ArchRecycleViewHolder) = componentsFactory.createViewBinder(holder.itemViewType) as ArchRecycleViewBinder<ArchRecycleViewHolder, Any?>
}

interface ArchRecycleViewBinder<V: ArchRecycleViewHolder, D> {
   fun bind(holder: V, data: D)
   fun unbind(holder: V) {}

   fun attach(holder: V) {}
   fun detach(holder: V) {}
}

Если коротко, то есть ArchRecycleViewBinder, который в методе bind связывает данные и холдер. На каждый тип элемента имеется один общий на элементы этого типа биндер. Он возвращает действия пользователя, которые инициируются внутри холдеров адаптера. View проксирует эти методы в Presenter.


class ServerMemberViewBinder @Inject constructor() : ArchRecycleViewBinder<ServerMemberViewHolder, ServerMemberItemData> {

   private val memberClickSubject = BehaviorSubject.create<ServerMember>()
   private val roleMenuClickSubject = BehaviorSubject.create<ServerMember>()
   private var roleSettingsEnable: Boolean = false

   fun memberClicks(): Observable<ServerMember> = memberClickSubject
   fun roleMenuClicks(): Observable<ServerMember> = roleMenuClickSubject

   fun setRoleSettingsEnable(enable: Boolean) {
       roleSettingsEnable = enable
   }

   override fun bind(holder: ServerMemberViewHolder, data: ServerMemberItemData) {
       val serverMember = data.serverMember

       ViewUtils.setViewVisibility(holder.moreMenu,
                                   roleSettingsEnable && serverMember.role != ServerRole.OWNER)

       holder.itemView.clicks()
               .map { serverMember }
               .subscribe(memberClickSubject)
       holder.moreMenu.clicks()
               .map { serverMember }
               .subscribe(roleMenuClickSubject)
   }
}



Вывод


В подобном стиле у нас написано ещё множество других частей приложения: обработка диплинков, работа с редактированием изображений, туториалы по ходу работы приложения, шаринг, пагинация и т.д. Самое удивительное, что порог вхождения в подобную архитектуру остается довольно низким. Большая часть задач решается минимальным количеством операторов, а, например, Room уже из коробки поддерживает Rx, и достаточно просто подписаться на данные, чтобы получать уведомления об их изменении. Довольно быстро разработчик перестраивается на такой стиль написания кода и начинает рассматривать любое действие в реактивном стиле, код становится читабельнее, а писать его быстрее.


Буду искренне рад любому фибдеку и комментариям об уместности использования реактивного подхода в разных слоях архитектуры приложения.