Доброго времени суток, друзья. Хотел бы поделиться своим не большим велосипедом открытием в области бесконечных потоков с помощью Observable, применении их в android проектах, а так же немного рассказать теорию (если не уснете к концу статьи).

И так как же создать бесконечность? Возьмем и реализуем корекурсию с помощью нашего ленивца Observable. Почему же мы не можем применить рекурсию, ведь она в теории может быть тоже бесконечна? Потому что в теории это конечно — хорошо, но мы то программисты живем в реальном мире, а в нем мы вычисления никогда не завершим, и такая бесконечность нам не нужна.

Позвольте привести примеры c помощью «андройдовской» Java, т.к мы будем применять все это дело, после не большой модернизации, в реальном проекте:

1) Пример с рекурсией или как не нужно делать:

public Observable<BigInteger> getState(Context context) {
        BigInteger i = ZERO;
        return Observable.create(
                subscriber -> {
                        while (true) subscriber.onNext(i);
                                i = i.add(ONE); 
                }
        );
    }

2) Пример с отпиской или как нужно делать:

public Observable<Boolean> getNetworkState(Context context) {
        BigInteger i = ZERO;
        return Observable.create(
                subscriber -> {
                    Runnable r = () -> {
                        while (!subscriber.isUnsubscribed())
                            while (true) subscriber.onNext(i);
                            i = i.add(ONE); 
                    };
                    new Thread(r).start();
                }
        );
    }

А теперь подробнее — что же происходит в первом и втором случае? В первом случае, рекурсия блокирует поток, по скольку вычисления никогда не кончится, и вы сами, наверняка, догадываетесь к чему это приведет. Поэтому, создадим явную конкурентность. Во втором примере мы создаем отдельный поток и будем делать события в нем.

И самое интересное то, что так как subscribe() всего лишь навсего создает новый поток, мы можем этот поток завершить всего лишь отписавшись от события. Очень удобно и без всяких interrup'ов.

Так как мы будем работать в отдельном потоке, то мы без потери производительности можем в реальном времени отслеживать различные состояния. И потом — так же легко взаимодействовать с UI через Handler.

Как же нам теперь все это применить в реальном проекте? Очень просто. Сейчас я покажу на актуальном, на мой взгляд примере — обнаружение сети.

Для начала — создадим класс, с нашим публичным Observable, на который мы можем подписаться, и приватным методом, который проверяет состояние сети. Вот код с документацией:

public class NetworkState {
    /**
     * @param context контекст в котором должен работать метод (<i>в основном
     это будет {@link android.app.Activity}</i>)
     * @return {@link Observable<Boolean>} Мгновенно возвращает состояние сети:
     * <br>
     *      <b>true</b> - сеть есть
     * </br>
     *
     * <br>
     *      <b>false</b> - сети нету
     * </br>
     */
    public Observable<Boolean> getNetworkState(Context context) {
        return Observable.create(
                subscriber -> {
                    Runnable r = () -> {
                        while (!subscriber.isUnsubscribed())
                            subscriber.onNext(hasConnection(context));
                    };
                    new Thread(r).start();
                }
        );
    }

    /**
     * Проверка на наличие сети
     * @param context Контекст
     * @return <b>true</b> - сеть есть,  <b>false</b> - сети нет
      */
    private static boolean hasConnection(final Context context) {
        ConnectivityManager connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);

        NetworkInfo wifiInfo = connectivityManager.getNetworkInfo(ConnectivityManager.TYPE_WIFI);
        NetworkInfo mobileInfo = connectivityManager.getNetworkInfo(ConnectivityManager.TYPE_MOBILE);

        return wifiInfo != null && wifiInfo.isConnected() || mobileInfo != null && mobileInfo.isConnected();
    }
}

Теперь, собственно, применение (Не забудьте где нибудь проинициализировать вьюшки):

/**
* Собственно тут все очевидно, дизейблим некую кнопку и показываем сообщение в TextView, о том, что нету сети 
**/
private void rxNetworkCheck(){
        new NetworkManager().getNetworkState(this).subscribe(x ->
                handlerNetwork.post(() -> {
                    if(x){
                        buttonNext.setVisibility(View.VISIBLE);
                        textViewNoConnection.setVisibility(View.INVISIBLE);
                    }
                    else{
                        buttonNext.setVisibility(View.INVISIBLE);
                        textViewNoConnection.setVisibility(View.VISIBLE);
                    }
                })
        );
    }

Подведем итоги: бесконечные потоки — вполне реальны, они применимы в реальных проектах, они очень удобны и отзывчивы. При тестировании данной концепции в своем проекте я не заметил ни каких «проседаний» в производительности или других артефактов. Конечно же этот способ актуален не только для обнаружении сети, тут можно не ограничивать себя в фантазии.

Вот, собственно, и все, что я хотел сказать, надеюсь своим постом принес кому-то пользу либо проявил интерес к данной теме.

Ну и конечно же книга которая помогла мне и продолжает помогать в изучении реактивного подхода к разработки, это — «Реактивное программирование с применением RxJava» от Томаш Нуркевича и Бена Кристенсена.

Комментарии (35)


  1. ulman95
    20.09.2017 18:45

    Зря конечно слово «велосипед» зачеркнули, к примерам кода отлично подходит. В первоиздании


    1. ulman95
      20.09.2017 20:38

      … в первоиздании запуск потока внутри create() не являлся эталонным решением.


  1. AlexeyVD
    20.09.2017 18:58

    Во-первых, в примерах 1) и 2) у вас приведен какой-то кривой нерабочий код с бесконечными циклами.

    Во-вторых, раз уж статья называется не «Бесконечные потоки с помощью Runnable и их применение в Android проектах», а идет речь о RX, то можно было бы сказать пару слов о backpressure и Flowable. Т.к. исполюзую ваш подход, разработчики рано или поздно словят MissingBackpressureException.


    1. Implozia Автор
      20.09.2017 20:43
      -2

      И первый и второй код рабочий, я проверял, второй прекрасно живет в проекте, конечно можно было бы и приостанавливать потоки на какое-то время или сделать более гибко с проверками, но в моем проекте была необходимость чекать сеть постоянно, а в пример я поставил именно этот кусок, как основу, от чего можно оттолкнуться, статья только о том что можно делать, к примеру — так, у меня не было цели создать кучу кейсов под все случаи жизни


      1. AlexeyVD
        20.09.2017 21:13
        +1

        Вот тут бесконечный цикл. i = i.add(ONE) никогда не вызовется:

                        subscriber -> {
                                while (true) subscriber.onNext(i);
                                i = i.add(ONE); 
                        }
        

        Тут тоже самое. Второй while зацыклен, !subscriber.isUnsubscribed() будет проверено только 1 раз при входе в первый цикл, i = i.add(ONE) никогда не вызовется:
                        subscriber -> {
                            Runnable r = () -> {
                                while (!subscriber.isUnsubscribed())
                                    while (true) subscriber.onNext(i);
                                i = i.add(ONE); 
                            };
                            new Thread(r).start();
                        }
        

        Ну и для периодических запросов можно использовать Observer.interval(), а переключать потоки выполнения через .observeOn()


        1. Implozia Автор
          20.09.2017 21:26
          -3

          Первый пример и написан как не правильный и как не нужно делать. Чувствую мне все таки придется сделать тестовую приложуху и все разжевать более детально. Опять же моя статья не претендует на панацею, моя основная мысль показать, обратить внимание, однако от коментов типа — «не рабочий код» у меня не хило так бомбануло, и, все таки я объясню все более подробнее и свою мысль с пруфами, пожалуйста, подождите немного)


          1. virtustilus
            22.09.2017 14:58

            Во втором примере, как и в первом зацикливается навсегда и остановить его невозможно.
            Потому что:
            while (true) subscriber.onNext(i); — никогда не остановится, кроме случая возникновения исключения.


            1. Implozia Автор
              22.09.2017 15:02

              Он остановится, если от этого Observable отписаться


              1. mayorovp
                22.09.2017 15:09

                Каким образом?


                1. Implozia Автор
                  22.09.2017 15:13

                  unsubscribe()


                  1. mayorovp
                    22.09.2017 15:16

                    И каким образом этот вызов прервет цикл?


                    1. Implozia Автор
                      22.09.2017 15:20

                      Там же условие цикла — while(!subscriber.isUnsubscribed()), вроде логично что он таким образом прекращает свою работу, можете сами проверить


                      1. mayorovp
                        22.09.2017 15:23

                        Где вы в цикле while (true) subscriber.onNext(i); видите проверку !subscriber.isUnsubscribed()? Я вижу true.


                      1. mayorovp
                        22.09.2017 15:27

                        На всякий случай: вот та самая ошибка, за которую вам третий день ставят минусы:



                        Вот так хорошо бесконечный цикл видно или надо линию жирнее рисовать?


                        1. Implozia Автор
                          22.09.2017 15:38

                          Во первых — вложенный цикл прервется тоже. Во вторых это даже не важно, потому что Observable — ленивый тип и если подписчиков на него не будет, то ничего не произойдет. Observable работает только при подписке


                          1. mayorovp
                            22.09.2017 15:43

                            Каким образом прервется вложенный цикл?


                            1. Implozia Автор
                              22.09.2017 15:49
                              -1

                              Давайте я вам все подробнее на пруфах покажу, как только домой приду? Потому что так, думаю, вопросы не кончатся. Но пока что просто скажу — таким образом, что некому будет эти события производить при отписке


                              1. virtustilus
                                22.09.2017 16:43

                                Возможно это еще дополнительно связано с плохим тоном написания кода (хотя некоторые все-таки за минимализм).
                                В общем Ваш код можно переписать вот так:

                                   while (!subscriber.isUnsubscribed()) {
                                             while (true) {
                                                      subscriber.onNext(i);
                                             }
                                   }
                                   
                                   i = i.add(ONE); 
                                


                                Здесь нужно понимать, что проверка subscriber.isUnsubscribed() выполнится один раз в момент запуска этого кода. И если подписчик подписан (что должно быть), то следующим шагом запустится второй цикл, который будет выполняться, пока у него в условии true, а там true, то есть выхода из цикла не будет никогда.


                              1. anegin
                                22.09.2017 19:40

                                Условие во внешнем цикле будет проверено только один раз. внутренний цикл будет выполняться бесконечно.
                                Чтобы работало так, как вы задумали, нужно, как минимум, убрать внутренний while:


                                while (!subscriber.isUnsubscribed())
                                    subscriber.onNext(i);

                                К тому же не помешало бы добавить Thread.sleep() внутрь цикла, чтобы дать процессору возможность отвлечься на другие задачи.
                                В любом случае, это не отменяет того факта, что данный подход изначально bad style


  1. Ztare
    20.09.2017 20:36
    +1

    А поток, который в таком режиме как в примере работает, не загрузит ядро процессора на 100% (опрос состояния + постоянное обращение к UI)? RxJava добавит какие-то свои задержки\ожидания?


    1. Implozia Автор
      20.09.2017 20:46
      -5

      Нет не загрузит, если наберутся лайки к твоему комменту, я сделаю тестовый проект на гите, с этим примером, и напишу конкретную статью о нем со всеми графиками и пояснениями)


  1. a15199732
    20.09.2017 20:50
    +3

    КГ/АМ Не совсем удачная статья.
    1. Observable.create() deprecated
    2. MissingBackpressureException неизбежен
    3. Чтобы выполнить Observable в отдельном потоке, есть SubscribeOn()
    4. Практически любая задача превращения одиночных событий в поток лучше всего решается с помощью Subject


    1. Goshik
      21.09.2017 00:41

      Observable.create() deprecated

      Уже нет, начиная с Rx 2.0. Пруф:
      Each reactive base type Flowable, Observable, Single, Maybe and Completable feature a safe create operator that does the right thing regarding backpressure (for Flowable) and cancellation (all)


  1. Implozia Автор
    20.09.2017 21:02
    -4

    1) Это применимо для RxJava2, а я писал на первой
    2) Избежен
    3) Можно, конечно, кто Вам не дает?)
    4) Блин, я написал об идее, а не о том как Вы будете ее реализовывать)


  1. Tagakov
    20.09.2017 21:06
    +2

    Это статья с очень вредными советами, никогда так не делайте! Если вы новичок и пытаетесь освоить Rx — закройте эту статью и поищите другую.


  1. anegin
    20.09.2017 23:12
    +1

    Очень неудачный пример того, что обычно решается стандартными средствами (BroadcastReceiver). Еже(милли)секундный апдейт UI там, где он должен изменятся только по факту пропадания/появления коннекта — это вообще нечто.


  1. artemgapchenko
    20.09.2017 23:38

    Вначале:

    Пример с отпиской или как нужно делать
    return Observable.create(
                    subscriber -> {
                        Runnable r = () -> {
                            while (!subscriber.isUnsubscribed())
                                while (true) subscriber.onNext(i);
                                i = i.add(ONE); 
                        };
                        new Thread(r).start();
                    }
            );


    И в конце:
    Ну и конечно же книга которая помогла мне и продолжает помогать в изучении реактивного подхода к разработки, это — «Реактивное программирование с применением RxJava» от Томаш Нуркевича и Бена Кристенсена.

    Блин, ну как вы так читали-то, там же объясняется, что 1) Создавать observables через create() — дурная затея, так как почти все (и вы в том числе) забывают про обработку backpressure; 2) Запускать треды внутри Observable.create() — дурно пахнущий код, для работы с потоками subscribeOn()/observeOn() есть.


    1. Implozia Автор
      20.09.2017 23:57
      -1

      subscrubeOn() На сколько я знаю очень удобен для использования пула потоков, а в моем примере всего один маленький тредик, в подобных примерах и авторы книги применяют такой подход


      1. gildor
        21.09.2017 08:13

        Чем же subscrubeOn() не удобен при использовании одного потока? Это стандартный механизм и использование голого треда тут ничем не оправдано


        1. mayorovp
          21.09.2017 08:39

          Конкретно в данном случае он неудобен тем что занимает на неопределенное время один поток из пула. Стандартные пулы потоков попросту не для такого сценария использования делались.


          1. gildor
            21.09.2017 08:43

            Так можно создать свой Scheduler из одного потока, который ничего из пула красть не будет, как и io Schedulers.io(), который не имеет ограничения на число потоков и просто создаст новый в случае необходимости.
            Ну я с вами соглашусь, что RxJava точно не для такого делалась, как пример в статье


            1. mayorovp
              21.09.2017 08:48
              +1

              Из одного потока пул создавать нельзя, потому что подписчиков может оказаться два — и тогда потребуется два потока.

              Да, я почему-то думал что `subscribeOn` принимает `ExecutorService` а не `rx.Scheduler`. Конечно, `Schedulers.io()` использовать можно. Хотя мне для такого странного кода больше нравится `Schedulers.newThread()` :-)


      1. artemgapchenko
        21.09.2017 08:29
        +1

        Всего один маленький тредик, которой можно было бы и не создавать, если взять его из уже созданного и правильным образом сконфигурированного пула потоков.


  1. Implozia Автор
    20.09.2017 23:54
    -3

    Ребят, я всех услышал, вот здесь я максимально подробно написал, что я пытался донести до вас этой статьей: habrahabr.ru/post/338372
    Возможно мы друг друга не поняли, если и тут я не прав, напишите, приму к сведению в дальнейшем, и буду планировать следующую статью, если, конечно будет что написать, максимально подробно. Прошу прощения, если я в ней не прав, но пригорело у меня знатно


  1. gildor
    21.09.2017 08:06

    Тут уже упомянался Observable.create() и то что это дурной тон и контр аргумент, что он не задепрекейчен больше и является рекомендованы.
    Хотел просто внести ясность, что есть deprecated Observable.create(), его не deprecated версия называется Observable.unsafeCreate().
    Рекомендованый билдер так же называется Observable.create(), но использует совершенно другую сигнатуру и является просто переименованой версией того, что раньше называлось Observable.fromEmitter() и в данный момент deprecated в rxjava 1.x