image

Одна из главных идей, лежащих в основе Rx, заключается в том, что неизвестно когда именно последовательность выдаст новое значение или завершится. Однако, у нас есть возможность управлять временем в которое мы начнем или закончим получать эти значения. К тому же, если наши подписчики используют внешние ресурсы, то мы вероятно захотим освободить их по окончанию некой последовательности.

Содержание:

  • Часть первая – Вступление
    1. Почему Rx?
    2. Ключевые типы
    3. Жизненный цикл подписки
  • Часть вторая – Последовательности
    1. Создание последовательности
    2. Фильтрация последовательности
    3. Исследование
    4. Агрегация
    5. Трансформация
  • Часть третья – Управление последовательностями
  • Часть четвертая – Параллельность

Подписка


Существует несколько перегруженных методов Observable::subscribe, выполняющих одну и ту же функцию

Subscription    subscribe()
Subscription    subscribe(Action1<? super T> onNext)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete)
Subscription    subscribe(Observer<? super T> observer)
Subscription    subscribe(Subscriber<? super T> subscriber)

subscribe() поглощает события, но сам по себе не выполняет непосредственных действий. Его перегруженные версии, имеющие хотя бы один параметр типа Action, создают объект Subscriber. Если не передать функции для событий onError и onCompleted, они попросту проигнорируются.

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e));
s.onNext(0);
s.onError(new Exception("Oops"));

Вывод
0
java.lang.Exception: Oops

Если не передать функцию для обработки ошибок, будет выброшено OnErrorNotImplementedException в месте, где на стороне провайдера вызван s.onError. В данном случае, провайдер[1] и потребитель[2] находятся в одном блоке кода, что позволяет использовать традиционный try-catch. Однако в реальности, провайдер и потребитель могут находится в разных местах. В таком случае, если потребитель не предоставит функцию для обработки ошибок, он никогда не узнает когда и по какой причине завершилась последовательность.

Отписка


Вы можете перестать получать данные еще до того как последовательность завершится. Все перегрузки метода subscribe возвращают объект интерфейса Subscribtion, который имеет 2 метода:

boolean isUnsubscribed()
void unsubscribe()

Вызов unsubscribe остановит поступление событий в observer.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription = values.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e),
    () -> System.out.println("Done")
);
values.onNext(0);
values.onNext(1);
subscription.unsubscribe();
values.onNext(2);

Вывод
0
1

Отписав одного подписчика, мы никак не повлияем на других подписчиков этого же ovbservable.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
    v -> System.out.println("First: " + v)
);
Subscription subscription2 = values.subscribe(
    v -> System.out.println("Second: " + v)
);
values.onNext(0);
values.onNext(1);
subscription1.unsubscribe();
System.out.println("Unsubscribed first");
values.onNext(2);

Вывод
First: 0
Second: 0
First: 1
Second: 1
Unsubscribed first
Second: 2


onError и onCompleted


onError и onCompleted означают завершение последовательности. Добросовестный observable, который следует контрактам Rx перестанет выдавать значения после наступления одного из этих событий. Это то, что следует всегда помнить при создании собственного Observable.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
    v -> System.out.println("First: " + v),
    e -> System.out.println("First: " + e),
    () -> System.out.println("Completed")
);
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);

Вывод
First: 0
First: 1
Completed


Освобождение ресурсов


Подписка удерживает в памяти ресурсы, с которыми связана. Эти ресурсы не будут автоматически освобождены при выходе объекта Subscription из области видимости. Если после вызова метода subscribe проигнорировать возвращаемое значение, то существует риск потерять единственную возможность отписаться. Подписка будет существовать далее, в то время как доступ к ней будет потерян, что может привести к утечке памяти и нежелательным действиям.

Существуют исключения. Например, при вызове перегруженных методов subscribe, неявно конструирующих объект Subscriber, будет создан механизм, который автоматически отвяжет подписчиков в момент когда завершится последовательность. Однако, даже в таком случае, следует помнить о бесконечных последовательностях. Вам все равно будет необходим объект Subscription, чтобы в какой-то момент прекратить получать данные от них.

Существует несколько реализаций интерфейса Subscription:
  • BooleanSubscription
  • CompositeSubscription
  • MultipleAssignmentSubscription
  • RefCountSubscription
  • SafeSubscriber
  • Scheduler.Worker
  • SerializedSubscriber
  • SerialSubscription
  • Subscriber
  • TestSubscriber

Мы еще встретимся с ними в будущих статьях. Стоит отметить, что Subscriber тоже реализует Subscription, что означает, что у нас также есть возможность отписаться используя ссылку на объект Subscriber.

С пониманием жизненного цикла подписок, вы получаете контроль над связанными с ними ресурсами. Это позволит вашей программе быть более предсказуемой, легко поддерживаемой и расширяемой и, надеемся, менее подверженной багам.

В следующей части мы научимся создавать и обрабатывать последовательности используя мощный инструментарий библиотеки.


[1] Тот, кто управляет (создает) observable – Примеч.
[2] Тот, кто использует значения, выданные observable – Примеч.

Комментарии (5)


  1. vedenin1980
    16.11.2015 15:37

    Введение в RxJava, часть первая

    А почему такой странный порядок нумерации частей? Уже в трех публикациях часть первая, честно говоря немного сбивает, потому что ожидаешь что часть первая это первая публикация по этой серии статей, а подсознание упорно утверждает «что где-то что-то подобное я уже видел». Тем более, те кто не видел других частей не поймут что перед первой частью надо поискать что-то ещё. ИМХО.


    1. bolein95
      16.11.2015 15:44

      Смотрите содержание под спойлером. Цикл статей довольно объемный. Так сделано чтобы избежать названий "Часть четырнадцатая – ..." и в то же время разделить весь цикл на подразделы. Возможно, это не идеальное решение. Предложите своё


      1. vedenin1980
        16.11.2015 15:59

        Не знаю, делить по разделам/главам/частям, либо «глава 1 часть 1» / «раздел 1 часть 3», либо «глава 1.1», «глава 1.2» и т.п. Впрочем, хозяин барин, конечно, я просто написал что меня, как читателя, такая необычная нумерация несколько сбивает с толку. Может быть для других это и не проблема.


        1. bolein95
          16.11.2015 17:50
          +1

          Я приму к сведению, спасибо


      1. Moxa
        16.11.2015 17:36
        +1

        м… так это три разные темы, я думал, что уже в тритий раз об одном и том же пишут >_<