Обработка событий — это цикл.

В прошлой части мы говорили об использовании thread pool executors для фоновой работы в Android. Проблема этого подхода оказалась в том, что отправляющий события знает, как должен быть обработан результат. Посмотрим теперь, что предлагает RxJava.

Дисклеймер: это не статья о том, как использовать RxJava в Android. Таких текстов в интернете и так прорва. Этот — о деталях реализации библиотеки.

Вообще говоря, RxJava — даже не инструмент конкретно для работы в фоне, это инструмент для обработки потоков событий.


Фоновая работа — это лишь один из аспектов такой обработки. Общая идея подхода в том, чтобы использовать Scheduler. Давайте посмотрим прямо в код этого класса:

public abstract class Scheduler {
  
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) { ... }
  
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { ... }
  
    @NonNull
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { ... } {

    @NonNull
    public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) { ... }
      
}

Довольно сложно, правда? Хорошая новость в том, что не надо самому это реализовывать! Библиотека уже включает целый ряд таких планировщиков: Schedulers.io(), Schedulers.computation(), и так далее. Всё, что требуется от вас — передать экземпляр scheduler в subscribeOn()/observeOn() метод вашей Rx-цепочки:

apiClient.login(auth)
	// some code omitted
	.subscribeOn(Schedulers.io())
	.observeOn(AndroidSchedulers.mainThread())

Затем RxJava сделает остальное за вас: возьмёт лямбды, которые вы передаете в операторы, и исполнит их на нужном планировщике.

Например, если вы хотите, чтобы ваши наблюдатели меняли пользовательский интерфейс, всё, что вам надо сделать — передать AndroidSchedulers.mainThread() в observeOn(). И дело в шляпе: больше никакой излишней связности, никакого платформо-специфичного кода, одно счастье. Конечно, AndroidSchedulers не входит в оригинальную библиотеку RxJava, а подключается в составе отдельной, но это просто ещё одна строчка в вашем build.gradle.

И что тут сложного с потоками? Хитрость в том, что нельзя просто разместить subscribeOn()/observeOn() в любом месте вашей rxChain (а было бы удобно, да?) Вместо этого приходится учесть, как эти операторы получают свои schedulers. Для начала давайте поймем, что каждый раз при вызове map, или flatMap, или filter, или ещё чего, вы получаете новый объект.

Например:

private fun attemptLoginRx() {
	showProgress(true)
	apiClient.login(auth)
			.flatMap { 
				user -> apiClient.getRepositories(user.repos_url, auth) 
			}
			.map { 
				list -> list.map { it.full_name } 
			}
			.subscribeOn(Schedulers.io())
			.observeOn(AndroidSchedulers.mainThread())
			.doFinally { showProgress(false) }
			.subscribe(
					{ list -> showRepositories(this, list)    },
					{ error -> Log.e("TAG", "Failed to show repos", error) }
			)
}

Так что тут почти каждая строчка создаёт новый объект:

// new SingleFlatMap()
val flatMap = apiClient.login(auth)
		.flatMap { apiClient.getRepositories(it.repos_url, auth) }
// new SingleMap
val map = flatMap
		.map { list -> list.map { it.full_name } }
// new SingleSubscribeOn
val subscribeOn = map
		.subscribeOn(Schedulers.io())
// new SingleObserveOn
val observeOn = subscribeOn
		.observeOn(AndroidSchedulers.mainThread())
// new SingleDoFinally
val doFinally = observeOn
		.doFinally { showProgress(false) }
// new ConsumerSingleObserver
val subscribe = doFinally
		.subscribe(
				{ list -> showRepositories(this@LoginActivity, list) },
				{ error -> Log.e("TAG", "Failed to show repos", error) }
		)
	}

И, например, SingleMap получит свой scheduler через цепочку вызовов, начинающуюся с вызова .subscribe() в конце нашей цепочки:

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(SingleObserver<? super T> subscriber) {
        ObjectHelper.requireNonNull(subscriber, "subscriber is null");

        subscriber = RxJavaPlugins.onSubscribe(this, subscriber);

        ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");

        try {
            subscribeActual(subscriber);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }

subsribeActual реализован для каждого Single-оператора так:

source.subscribe()

где source — оператор, предшествующий текущему, так что создается цепочка, с которой мы работаем и по которой достигаем первого созданного Single. В нашем случае это Single.fromCallable:

override fun login(auth: Authorization): Single<GithubUser> = Single.fromCallable {
	val response = get("https://api.github.com/user", auth = auth)
	if (response.statusCode != 200) {
		throw RuntimeException("Incorrect login or password")
	}

	val jsonObject = response.jsonObject
	with(jsonObject) {
		return@with GithubUser(getString("login"), getInt("id"),
				getString("repos_url"), getString("name"))
	}
}

Внутри этой лямбды мы осуществляем свои сетевые вызовы.

Но где наш scheduler? Тут, внутри SingleSubsribeOn:

@Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
        s.onSubscribe(parent);

        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }

В данном случае scheduler — тот, который мы передали в метод subsribeOn().

Весь этот код показывает, как scheduler, который мы передали в цепь, используется кодом, который мы передали в лямбды оператора.

Также обратим внимание на метод observeOn(). Он создает экземпляр класса (в нашем случае SingleObserveOn), и его subscribeActial для нас уже смотрится тривиально:

@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
    source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
}

А вот ObserveOnSingleObserver тут куда интереснее:

ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
  this.actual = actual;
  this.scheduler = scheduler;
}

@Override
public void onSuccess(T value) {
  this.value = value;
  Disposable d = scheduler.scheduleDirect(this);
  DisposableHelper.replace(this, d);
}

При вызове observeOn в scheduler-потоке оказывается вызван observer, что, в свою очередь, открывает возможность переключения потоков прямо в rxChain: можно получить данные с сервера на Schedulers.io(), затем произвести ресурсоёмкие вычисления в Schedulers.computation(), обновить UI, посчитать еще что-то, а затем просто перейти к коду в subscribe.

RxJava — довольно сложный «под капотом», очень гибкий и мощный инструмент для обработки событий (и, как следствие, управления фоновой работой). Но, по-моему, у этого подхода есть свои недостатки:

  1. На обучение RxJava уходит много времени
  2. Количество операторов, которые нужно выучить, большое, а разница между ними неочевидна
  3. Стек-трейсы вызовов для RxJava почти не имеют отношения к тому коду, который вы сами пишете

Что дальше? Разумеется, котлиновские корутины!

Предыдущие статьи серии:


От автора: уже завтра и послезавтра пройдёт конференция Mobius, где я и расскажу про корутины в Kotlin. Если серия статей заинтересовала и хочется продолжения — ещё не поздно принять решение о её посещении!

Комментарии (1)


  1. asmrnv777
    22.04.2018 19:34

    Спасибо. Есть замечание:
    В IDEA-based IDE можно выбирать текст средней кнопкой мыши, чтобы не получалось вот такого.image