* от переводчика: я долго думал над тем, как перевести на русский язык глагол «to grok». С одной стороны, это слово переводится как «понять» или «осознать», а с другой стороны, при переводе романа Роберта Хайнлайна «Чужак в чужой стране» (в котором это слово впервые и появилось на свет), переводчики сделали из него русское «грокать». Роман я не читал, поэтому счёл, что есть у этого слова какие-то смысловые оттенки, которые русскими аналогами не передавались, а посему в своём переводе использовал ту же самую кальку с английского.

RxJava — это, сейчас, одна из самых горячих тем для обсуждения у Android-программистов. Единственная проблема состоит в том, что понять самые её основы, если вы не сталкивались ни с чем подобным, может быть довольно затруднительно. Функциональное реактивное программирование довольно сложно понять, если вы пришли из императивного мира, но, как только вы разберётесь с ним, вы поймёте, насколько же это круто!
Я постараюсь дать вам некое общее представление об RxJava. Задача этого цикла статей состоит не в том, чтобы объяснить всё вплоть до последней запятой (вряд ли я смог бы это сделать), но, скорее в том, чтобы заинтересовать вас RxJava, и тем, как она работает.

Основы


Базовыми строительными блоками реактивного кода являются Observables и Subscribers1. Observable является источником данных, а Subscriber — потребителем.
Порождение данных через Observable всегда происходит в соответствии с одним и тем же порядком действий: Observable «излучает» некоторое количество данных (в том числе, Observable может ничего и не излучать), и завершает свою работу — либо успешно, либо с ошибкой. Для каждого Subscriber, подписанного на Observable, вызывается метод Subscriber.onNext() для каждого элемента потока данных, после которого может быть вызван как Subscriber.onComplete(), так и Subscriber.onError().
Всё это очень похоже на обычный паттерн «Наблюдатель», но есть одно важное отличие: Observables часто не начинают порождать данные до тех пор, пока кто-нибудь явно не подписывается на них2. Другими словами: если дерево падает, а рядом никого нет, значит звук его падения не слышен.

Здравствуй, мир!


Давайте разберёмся с небольшим примером. Сначала создадим простой Observable:

Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, world!");
            sub.onCompleted();
        }
    }
);

Наш Observable порождает строку «Hello, world!», и завершает свою работу. Теперь создадим Subscriber для того, чтобы принять данные и что-нибудь с ними сделать.

Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { }

    @Override
    public void onError(Throwable e) { }
};

Всё, что делает Subscriber — печатает строки, переданные ему Observable. Теперь, когда у нас есть myObservable и mySubscriber, мы можем связать их вместе, воспользовавшись методом subscribe():

myObservable.subscribe(mySubscriber);
// Выводит "Hello, world!"

Как только мы подписали mySubscriber на myObservable, myObservable вызывает у mySubscriber методы onNext() и onComplete(), в результате чего mySubscriber выводит в консоль «Hello, world!», и завершает своё выполнение.

Упрощаем код


Вообще говоря, мы написали слишком много кода для такой простой задачи, как вывод «Hello, world!» в консоль. Я специально написал этот код таким образом, чтобы вы могли легко разобраться, что тут к чему. В RxJava есть много более рациональных способов решить подобную задачу.
Во-первых, давайте упростим наш Observable. В RxJava существуют методы создания Observable, подходящих для решения наиболее типовых задач. В нашем случае, Observable.just() порождает один элемент данных, а потом завершает своё выполнение, точно так же как и наш первый вариант3:

Observable<String> myObservable = Observable.just("Hello, world!");

Далее, давайте-ка упростим наш Subscriber. Нас не интересуют методы onCompleted() и onError(), так что мы можем использовать другой базовый класс для определения того, что нужно сделать в onNext():

Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};

Action может быть использован для замены любой части Subscriber: Observable.subscribe() может принять один, два или три Action-параметра, которые будут выполняться вместо onNext(), onError() и onCompete(). То есть, мы можем заменить наш Subscriber вот так:

myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);

Но, так как нам не нужны onError() и onCompete(), мы можем упростить код ещё больше:

myObservable.subscribe(onNextAction);
// Выводит "Hello, world!"

Теперь давайте избавимся от переменных, прибегнув к цепочечному вызову методов:

Observable.just("Hello, world!")
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
              System.out.println(s);
        }
    });

Ну и, наконец, мы можем воспользоваться лямбдами из Java 8, чтобы упростить код ещё больше:

Observable.just("Hello, world!")
    .subscribe(s -> System.out.println(s));

Если вы пишете под Android (и поэтому не можете использовать Java 8), я очень рекомендую retrolambda, которая поможет упростить очень уж многословный в некоторых местах код.

Трансформация


Давайте попробуем нечто новое.
Например, я хочу добавить свою подпись к «Hello, world!», выводимому в консоль. Как это сделать? Во-первых, мы можем изменить наш Observable:

Observable.just("Hello, world! -Dan")
    .subscribe(s -> System.out.println(s));

Это может сработать, если вы имеете доступ к исходному коду, в котором определяется ваш Observable, но это не всегда будет так — например, когда вы используете чью-то библиотеку. Другая проблема: что, если мы используем наш Observable во многих местах, но хотим добавлять подпись только в некоторых случаях?
Можно попробовать переписать Subscriber:

Observable.just("Hello, world!")
    .subscribe(s -> System.out.println(s + " -Dan"));

Такой вариант тоже является неподходящим, но уже по другим причинам: я хочу, чтобы мои подписчики были настолько легковесными, насколько это возможно, так как я могу запускать их в главном потоке. На более концептуальном уровне, подписчики должны реагировать на поступающие в них данных, а не изменять их.
Было бы здорово, если можно было изменить «Hello, world!» на некотором промежуточном шаге.

Введение в операторы


И такой промежуточный шаг, предназначенный для трансформации данных, есть. Имя ему — операторы, и они могут быть использованы в промежутке между Observable и Subscriber для манипуляции данными. Операторов в RxJava очень много, поэтому для начала лучше будет сосредоточиться лишь на некоторых.
Для нашей конкретной ситуации лучше всего подошёл бы оператор map(), через который можно преобразовывать один элемент данных в другой:

Observable.just("Hello, world!")
    .map(new Func1<String, String>() {
        @Override
        public String call(String s) {
            return s + " -Dan";
        }
    })
    .subscribe(s -> System.out.println(s));

И снова можно прибегнуть к лямбдам:

Observable.just("Hello, world!")
    .map(s -> s + " -Dan")
    .subscribe(s -> System.out.println(s));

Круто, да? Наш оператор map(), грубо говоря, это Observable, который трансформирует поступающий в него элемент данных. Мы можем создать цепочку из такого количества map(), какое посчитаем нужным для того, чтобы придать данным наиболее удобную и простую форму, чтобы облегчить задачу нашему Subscriber.

Ещё кое-что о map()


Интересным свойством map() является то, что он не обязан порождать данные того же самого типа, что и исходный Observable.
Допустим, что наш Subscriber должен выводить не порождаемый текст, а его хэш:

Observable.just("Hello, world!")
    .map(new Func1<String, Integer>() {
        @Override
        public Integer call(String s) {
            return s.hashCode();
        }
    })
    .subscribe(i -> System.out.println(Integer.toString(i)));

Интересно: мы начали со строк, а наш Subscriber принимает Integer. Кстати, мы опять забыли о лямбдах:

Observable.just("Hello, world!")
    .map(s -> s.hashCode())
    .subscribe(i -> System.out.println(Integer.toString(i)));

Как я говорил ранее, мы хотим, чтобы наш Subscriber делал как можно меньше работы, поэтому давайте применим ещё один map(), чтобы сконвертировать наш хэш обратно в String:

Observable.just("Hello, world!")
    .map(s -> s.hashCode())
    .map(i -> Integer.toString(i))
    .subscribe(s -> System.out.println(s));

Взгляните на это — наши Observable и Subscriber теперь выглядят так же, как и в самом начале! Мы просто добавили несколько промежуточных шагов, трансформирующих наши данные. Мы могли бы даже снова добавить код, прибавляющий мою подпись к порождаемым строкам:

Observable.just("Hello, world!")
    .map(s -> s + " -Dan")
    .map(s -> s.hashCode())
    .map(i -> Integer.toString(i))
    .subscribe(s -> System.out.println(s));

И что дальше?


Сейчас вы наверное думаете: «Ну как обычно: показывают простецкий пример, и говорят, что технология крутая, потому что она позволяет решить эту задачу в две строчки кода». Согласен, пример и правда простой. Но из него можно вынести пару полезных идей:

Идея №1: Observable и Subscriber могут делать всё, что угодно


Не ограничивайте своё воображение, возможно всё, чего вы хотите.
Ваш Observable может быть запросом к базе данных, а Subscriber может отображать на экране результаты запроса. Observable может также быть кликом по экрану, Subscriber может содержать в себе реакцию на этот клик. Observable может быть потоком байтов, принимаемых из сети, тогда как Subscriber может писать эти данные на устройство хранения данных.
Это фреймворк общего назначения, способный справиться почти с любой проблемой.

Идея №2: Observable и Subscriber не зависят от промежуточных шагов, находящихся между ними


Можно вставить сколько угодно вызовов map() в промежутке между Observable и подписанным на него Subscriber. Система является легко компонуемой, и с её помощью очень легко управлять потоком данных. Если операторы работают с корректными входными/выходными данными, можно написать цепочку преобразований бесконечной длины4.

Взгляните на эти ключевые идеи вместе и вы увидите систему с большим потенциалом. Сейчас, правда, у нас есть только один оператор map(), и с ним много не напишешь. Во второй части этой статьи мы рассмотрим большое количество операторов, доступных вам из коробки, когда вы пользуетесь RxJava.

Перейти ко второй части.


1 Subscriber имплементирует интерфейс Observer, и потому «базовым строительным блоком» назвать можно, скорее, последний, но на практике вы чаще всего будете использовать Subscriber, потому что он имеет несколько дополнительных полезных методов, в том числе и Subscriber.unsubscribe().
2 В RxJava есть «горячие» и «холодные» Observables. Горячий Observable порождает данные постоянно, даже если на него никто не подписан. Холодный Observable, соответственно, порождает данные только если у него есть хотя бы один подписчик (в статье используются именно холодные Observables). Для начальных стадий изучения RxJava эта разница не столь важна.
3 Строго говоря, Observable.just() не является полным аналогом нашего изначального кода, но почему это так происходит, я объясню только в третьей части статьи.
4 Окей, не такой уж и бесконечной, так как в какой-то момент я упрусь в ограничения, налагаемые железом, но вы понимаете, что я хотел сказать.

Комментарии (24)


  1. Bolloky
    24.08.2015 10:10
    -4

    Я бы перевёл to grok как «раздуплять» =)


    1. conf
      24.08.2015 14:43
      +1

      Лучше уж «вникать».


  1. eugenius_nsk
    24.08.2015 11:31
    +9

    Слово to grok изобретено Хайнлайном и имеет два смысла: 1) понять во всей максимально возможной полноте и 2) съесть (по сюжету романа эти два значения связаны между собой). Так что переводчики романа (равно как и вы) поступили совершенно правильно, изобретя аналогичное слово для русского языка.


  1. relgames
    24.08.2015 12:02
    +2

    Последнее время много слышу про Reactive и т.д.
    А вот в чем отличие от Java 8 Streams? Все ваши примеры в статье можно заменить на Streams.


    1. Artem_zin
      24.08.2015 13:29
      +9

      Java 8 Streams работают с Iterable, то есть Streams сами вытягивают данные, Pull API. В то время как в Rx (RxJava) Observable сам уведомляет о следующем событии (ошибка, новый элемент или конец потока), то есть это Push API.

      Кроме того, Rx — это больше про бизнес логику, обработку данных, переключение между потоками и вот это всё, в то время как Streams это про функциональные операции над коллекциями/потоками данных.

      Скажем, попробуйте с помощью Streams сделать такую вещь: сетевой запрос, который сначала вернёт локальный кеш (если он есть), потом сходит в сеть, вернёт результат из сети (и положит его в кеш), причём всё это на бэкграунд потоке(ах), а слушатель на UI потоке + обработка ошибок. Я не спорю, что это можно сделать на Streams с такой-то матерью, но Rx для подобных вещей гораздо удобнее.


      1. MercurieVV
        25.08.2015 03:10

        В кратце, я бы сказал, что Streams — для работы с коллекциями (по цепочке), rx — для работы с событиями (по цепочке). При том что у rx намного богаче функциональность.


    1. Dimezis
      24.08.2015 14:17
      +3

      Помимо вышеперечисленного:
      Для Android нет Java 8 и ее Streams, даже если использовать Retrolambda.
      Еще для Android есть RxBinding либы для различных виджетов, которые так же дают много вкусняшек и завязаны на RxJava.


      1. MercurieVV
        25.08.2015 03:12
        +2

        Я использую этот проект как стрим на андройде github.com/aNNiMON/Lightweight-Stream-API. Но он конечно не исключает rx.


  1. solver
    24.08.2015 19:19

    Может конечно придираюсь, но

    > Задача этого цикла статей состоит не в том, чтобы объяснить всё вплоть до последней запятой (вряд ли я смог бы это сделать), но, скорее в том, чтобы заинтересовать вас RxJava, и тем, как она работает.

    И сразу понеслись примеры кода…
    А где объяснение, что это вообще такое ваш RxJava и для чего его придумали?


    1. MercurieVV
      25.08.2015 03:23
      +3

      Придумали для того чтоб реалзовать парадигму реактивного программирования.
      Суть — передавать события по цепочке со всевозможными трансформациями по пути. Абстрактный пример — представте таблицу Excel. Вы изменили ячейку — это начальное событие. Но от этой ячейки зависит значение других ячеек — они слушают событие от первой ячейки, применяют его и рассылают событие что они изменились. У них так же могут быть слушатели… Это простой пример реализации реактивного программирования.


      1. awsi
        28.08.2015 00:15

        Многопоточное приложение — что будет, после того, как в 1-й ячейке изменились данные и информация была реактивно передана дальше и, что будет, если в момент реактивного изменения второй ячейки, произойдет изменение данных в 1-ой? Будет новая итерация всей системы и зачем это нужно? Или, чем закончится изменение 3-й ячейки в 1-й итерации, если данные уже изменились? Может быть лучше применить шину данных (Otto, к примеру), где все слушатели получают актуальные, в настоящий момент, данные, не зависимо от состояния системы в целом.


        1. Artem_zin
          28.08.2015 00:20
          +2

          Нет никаких ячеек :)

          Для таких случаев есть операторы, которые позволяют обрабатывать группы событий: Debounce, Window, Buffer и тд.

          Event bus тут ничем не отличается, собственно, Observable (PublishSubject) может заменить Event Bus полностью.


          1. awsi
            28.08.2015 00:29

            Ячейки — ответ на пост MercurieVV.


            1. awsi
              28.08.2015 00:36

              В том то и вопрос, почему не использовать Event Bus? Это дешевле в размере, производительности и меньше есть батарейку.


              1. Artem_zin
                28.08.2015 00:40

                Ну, если вам его хватает — отлично :)


                1. awsi
                  28.08.2015 00:54

                  Я здесь потому, что мне не хватает того, что я использую…


          1. awsi
            28.08.2015 00:46
            +1

            Группа событий — это аналог транзакций? Могу ли я получить ответ на свой вопрос? Вы готовы ответить, что будет, если данные изменятся динамически в другом потоке? Как RxJava это регламентирует?
            Исходя из простого примера с ячейками в ехеl.


            1. Artem_zin
              28.08.2015 00:58
              +2

              Группа событий — это аналог транзакций?

              Nope. Смотрите, в Rx (да и не только) есть такое понятие — Backpressure. Это когда у вас поток событий большой и они поступают быстрее, чем вы их обрабатываете. Для борьбы с Backpressure или просто для утихомиривания резкого потока событий в Rx есть несколько полезных операторов:

              Debounce — скажем, пользователь вводит текст в поле, вы на это шлёте запросы в сеть, но вы не хотите на каждый символ этого делать если пользователь быстро вводит текст, тогда Debounce ваш друг — вы указываете временное окно, скажем, 300 мс и Debounce вернет вам результат только если после последнего события прошло 300 мс, то есть вы отфильтруете частые нажатия.

              Filter — простейший оператор, в котором вы можете по какой-то вашей логике пропускать часть событий.

              Buffer — оператор, которому вы говорите, а группируй мне события по 10 штук (или сколько нужно) и отправляй списком, а не по отдельности.

              Window — похож на Buffer, но группирует события по временному промежутку, скажем, 500 мс, etc.

              И тд., всякие OnBackpressureDrop, etc, очень рекомендую посмотреть документацию с прекрасными иллюстрациями: reactivex.io


            1. Artem_zin
              28.08.2015 01:03
              +1

              Могу ли я получить ответ на свой вопрос?

              Да :)

              Вы готовы ответить, что будет, если данные изменятся динамически в другом потоке?

              Готов :)

              Как RxJava это регламентирует?

              Как напишите, так и будет работать, зависит от примененных операторов. Вы можете добиться любого нужного вам поведения, причём, довольно элегантно.

              Нужно отреагировать на все события — реагируйте, нужно пропустить близкие по времени события или сгруппировать и обработать их пачкой — пожалуйста, все возможности Rx перед вами, нужно лишь понять и прочитать документацию: reactivex.io

              P.S.
              Кстати, приходите на Yandex Mobile Camp 12 сентября (суббота) в Москве или на трансляцию в Новосибирске или Нижнем Новгороде, поговорим про Rx events.yandex.ru/events/yamobcamp :)


        1. MercurieVV
          28.08.2015 09:46

          Во первых, хочу отметить что пример с Экселем — абстрактный пример что такое реактивное программирование, а не как работает rxJava


          1. MercurieVV
            28.08.2015 10:08
            +1

            Ну а если говорить о rxJava, то ее суть — шина/цепочка/pipeline событий. Очень желательно чтоб передаваемые данные были immutable, иначе вся ответственность за его изменения ложится на вас.
            Возьмем ваш пример применительно к rxJava.
            Простейшая реализация:
            Такт 1) Ячейка (испускатель событий) 1, испускает событие со своим новым значением.
            Такт 2) Оно попадает в ячейку 2. Она его обрабатывает. В момент обработки, ячейка 1 испускает новый объект данных.
            Такт 3) В ячейку 2 приходят новые данные. Она их обрабатывает.

            Реализация с backpressure buffer:
            Такт 1) Ячейка (испускатель событий) 1, испускает событие со своим новым значением. Буффер ловит его и держит у себя не отпуская дальше.
            Такт 2) Я1 испускает новое событие. Буффер опять его ловит.
            … какое-то время спустя
            Такт N) Буффер все накопленные событие кладет в список и передает его как список в Я2

            У rxJava очень наглядные картинки по каждому оператору. Посмотрите например
            на debounce (раскройте там список rxJava в language specific) reactivex.io/documentation/operators/debounce.html#collapseRxJava
            и group reactivex.io/documentation/operators/buffer.html


            1. awsi
              28.08.2015 11:27

              Спасибо.



  1. TheDimasig
    03.09.2015 12:05

    Отличная статья, спасибо за перевод!