И так как же создать бесконечность? Возьмем и реализуем корекурсию с помощью нашего ленивца Observable. Почему же мы не можем применить рекурсию, ведь она в теории может быть тоже бесконечна? Потому что в теории это конечно — хорошо, но мы то программисты живем в реальном мире, а в нем мы вычисления никогда не завершим, и такая бесконечность нам не нужна.
Позвольте привести примеры c помощью «андройдовской» Java, т.к мы будем применять все это дело, после не большой модернизации, в реальном проекте:
1) Пример с рекурсией или как не нужно делать:
public Observable<BigInteger> getState(Context context) {
BigInteger i = ZERO;
return Observable.create(
subscriber -> {
while (true) subscriber.onNext(i);
i = i.add(ONE);
}
);
}
2) Пример с отпиской или как нужно делать:
public Observable<Boolean> getNetworkState(Context context) {
BigInteger i = ZERO;
return Observable.create(
subscriber -> {
Runnable r = () -> {
while (!subscriber.isUnsubscribed())
while (true) subscriber.onNext(i);
i = i.add(ONE);
};
new Thread(r).start();
}
);
}
А теперь подробнее — что же происходит в первом и втором случае? В первом случае, рекурсия блокирует поток, по скольку вычисления никогда не кончится, и вы сами, наверняка, догадываетесь к чему это приведет. Поэтому, создадим явную конкурентность. Во втором примере мы создаем отдельный поток и будем делать события в нем.
И самое интересное то, что так как subscribe() всего лишь навсего создает новый поток, мы можем этот поток завершить всего лишь отписавшись от события. Очень удобно и без всяких interrup'ов.
Так как мы будем работать в отдельном потоке, то мы без потери производительности можем в реальном времени отслеживать различные состояния. И потом — так же легко взаимодействовать с UI через Handler.
Как же нам теперь все это применить в реальном проекте? Очень просто. Сейчас я покажу на актуальном, на мой взгляд примере — обнаружение сети.
Для начала — создадим класс, с нашим публичным Observable, на который мы можем подписаться, и приватным методом, который проверяет состояние сети. Вот код с документацией:
public class NetworkState {
/**
* @param context контекст в котором должен работать метод (<i>в основном
это будет {@link android.app.Activity}</i>)
* @return {@link Observable<Boolean>} Мгновенно возвращает состояние сети:
* <br>
* <b>true</b> - сеть есть
* </br>
*
* <br>
* <b>false</b> - сети нету
* </br>
*/
public Observable<Boolean> getNetworkState(Context context) {
return Observable.create(
subscriber -> {
Runnable r = () -> {
while (!subscriber.isUnsubscribed())
subscriber.onNext(hasConnection(context));
};
new Thread(r).start();
}
);
}
/**
* Проверка на наличие сети
* @param context Контекст
* @return <b>true</b> - сеть есть, <b>false</b> - сети нет
*/
private static boolean hasConnection(final Context context) {
ConnectivityManager connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo wifiInfo = connectivityManager.getNetworkInfo(ConnectivityManager.TYPE_WIFI);
NetworkInfo mobileInfo = connectivityManager.getNetworkInfo(ConnectivityManager.TYPE_MOBILE);
return wifiInfo != null && wifiInfo.isConnected() || mobileInfo != null && mobileInfo.isConnected();
}
}
Теперь, собственно, применение (Не забудьте где нибудь проинициализировать вьюшки):
/**
* Собственно тут все очевидно, дизейблим некую кнопку и показываем сообщение в TextView, о том, что нету сети
**/
private void rxNetworkCheck(){
new NetworkManager().getNetworkState(this).subscribe(x ->
handlerNetwork.post(() -> {
if(x){
buttonNext.setVisibility(View.VISIBLE);
textViewNoConnection.setVisibility(View.INVISIBLE);
}
else{
buttonNext.setVisibility(View.INVISIBLE);
textViewNoConnection.setVisibility(View.VISIBLE);
}
})
);
}
Подведем итоги: бесконечные потоки — вполне реальны, они применимы в реальных проектах, они очень удобны и отзывчивы. При тестировании данной концепции в своем проекте я не заметил ни каких «проседаний» в производительности или других артефактов. Конечно же этот способ актуален не только для обнаружении сети, тут можно не ограничивать себя в фантазии.
Вот, собственно, и все, что я хотел сказать, надеюсь своим постом принес кому-то пользу либо проявил интерес к данной теме.
Ну и конечно же книга которая помогла мне и продолжает помогать в изучении реактивного подхода к разработки, это — «Реактивное программирование с применением RxJava» от Томаш Нуркевича и Бена Кристенсена.
Комментарии (35)
AlexeyVD
20.09.2017 18:58Во-первых, в примерах 1) и 2) у вас приведен какой-то кривой нерабочий код с бесконечными циклами.
Во-вторых, раз уж статья называется не «Бесконечные потоки с помощью Runnable и их применение в Android проектах», а идет речь о RX, то можно было бы сказать пару слов о backpressure и Flowable. Т.к. исполюзую ваш подход, разработчики рано или поздно словят MissingBackpressureException.Implozia Автор
20.09.2017 20:43-2И первый и второй код рабочий, я проверял, второй прекрасно живет в проекте, конечно можно было бы и приостанавливать потоки на какое-то время или сделать более гибко с проверками, но в моем проекте была необходимость чекать сеть постоянно, а в пример я поставил именно этот кусок, как основу, от чего можно оттолкнуться, статья только о том что можно делать, к примеру — так, у меня не было цели создать кучу кейсов под все случаи жизни
AlexeyVD
20.09.2017 21:13+1Вот тут бесконечный цикл. i = i.add(ONE) никогда не вызовется:
subscriber -> { while (true) subscriber.onNext(i); i = i.add(ONE); }
Тут тоже самое. Второй while зацыклен, !subscriber.isUnsubscribed() будет проверено только 1 раз при входе в первый цикл, i = i.add(ONE) никогда не вызовется:
subscriber -> { Runnable r = () -> { while (!subscriber.isUnsubscribed()) while (true) subscriber.onNext(i); i = i.add(ONE); }; new Thread(r).start(); }
Ну и для периодических запросов можно использовать Observer.interval(), а переключать потоки выполнения через .observeOn()Implozia Автор
20.09.2017 21:26-3Первый пример и написан как не правильный и как не нужно делать. Чувствую мне все таки придется сделать тестовую приложуху и все разжевать более детально. Опять же моя статья не претендует на панацею, моя основная мысль показать, обратить внимание, однако от коментов типа — «не рабочий код» у меня не хило так бомбануло, и, все таки я объясню все более подробнее и свою мысль с пруфами, пожалуйста, подождите немного)
virtustilus
22.09.2017 14:58Во втором примере, как и в первом зацикливается навсегда и остановить его невозможно.
Потому что:
while (true) subscriber.onNext(i);
— никогда не остановится, кроме случая возникновения исключения.Implozia Автор
22.09.2017 15:02Он остановится, если от этого Observable отписаться
mayorovp
22.09.2017 15:09Каким образом?
Implozia Автор
22.09.2017 15:13unsubscribe()
mayorovp
22.09.2017 15:16И каким образом этот вызов прервет цикл?
Implozia Автор
22.09.2017 15:20Там же условие цикла — while(!subscriber.isUnsubscribed()), вроде логично что он таким образом прекращает свою работу, можете сами проверить
mayorovp
22.09.2017 15:23Где вы в цикле
while (true) subscriber.onNext(i);
видите проверку!subscriber.isUnsubscribed()
? Я вижуtrue
.
mayorovp
22.09.2017 15:27На всякий случай: вот та самая ошибка, за которую вам третий день ставят минусы:
Вот так хорошо бесконечный цикл видно или надо линию жирнее рисовать?
Implozia Автор
22.09.2017 15:38Во первых — вложенный цикл прервется тоже. Во вторых это даже не важно, потому что Observable — ленивый тип и если подписчиков на него не будет, то ничего не произойдет. Observable работает только при подписке
mayorovp
22.09.2017 15:43Каким образом прервется вложенный цикл?
Implozia Автор
22.09.2017 15:49-1Давайте я вам все подробнее на пруфах покажу, как только домой приду? Потому что так, думаю, вопросы не кончатся. Но пока что просто скажу — таким образом, что некому будет эти события производить при отписке
virtustilus
22.09.2017 16:43Возможно это еще дополнительно связано с плохим тоном написания кода (хотя некоторые все-таки за минимализм).
В общем Ваш код можно переписать вот так:
while (!subscriber.isUnsubscribed()) { while (true) { subscriber.onNext(i); } } i = i.add(ONE);
Здесь нужно понимать, что проверка subscriber.isUnsubscribed() выполнится один раз в момент запуска этого кода. И если подписчик подписан (что должно быть), то следующим шагом запустится второй цикл, который будет выполняться, пока у него в условии true, а там true, то есть выхода из цикла не будет никогда.
anegin
22.09.2017 19:40Условие во внешнем цикле будет проверено только один раз. внутренний цикл будет выполняться бесконечно.
Чтобы работало так, как вы задумали, нужно, как минимум, убрать внутренний while:
while (!subscriber.isUnsubscribed()) subscriber.onNext(i);
К тому же не помешало бы добавить Thread.sleep() внутрь цикла, чтобы дать процессору возможность отвлечься на другие задачи.
В любом случае, это не отменяет того факта, что данный подход изначально bad style
Ztare
20.09.2017 20:36+1А поток, который в таком режиме как в примере работает, не загрузит ядро процессора на 100% (опрос состояния + постоянное обращение к UI)? RxJava добавит какие-то свои задержки\ожидания?
Implozia Автор
20.09.2017 20:46-5Нет не загрузит, если наберутся лайки к твоему комменту, я сделаю тестовый проект на гите, с этим примером, и напишу конкретную статью о нем со всеми графиками и пояснениями)
a15199732
20.09.2017 20:50+3КГ/АМНе совсем удачная статья.
1. Observable.create() deprecated
2. MissingBackpressureException неизбежен
3. Чтобы выполнить Observable в отдельном потоке, есть SubscribeOn()
4. Практически любая задача превращения одиночных событий в поток лучше всего решается с помощью Subject
Implozia Автор
20.09.2017 21:02-41) Это применимо для RxJava2, а я писал на первой
2) Избежен
3) Можно, конечно, кто Вам не дает?)
4) Блин, я написал об идее, а не о том как Вы будете ее реализовывать)
Tagakov
20.09.2017 21:06+2Это статья с очень вредными советами, никогда так не делайте! Если вы новичок и пытаетесь освоить Rx — закройте эту статью и поищите другую.
anegin
20.09.2017 23:12+1Очень неудачный пример того, что обычно решается стандартными средствами (BroadcastReceiver). Еже(милли)секундный апдейт UI там, где он должен изменятся только по факту пропадания/появления коннекта — это вообще нечто.
artemgapchenko
20.09.2017 23:38Вначале:
Пример с отпиской или как нужно делать
return Observable.create( subscriber -> { Runnable r = () -> { while (!subscriber.isUnsubscribed()) while (true) subscriber.onNext(i); i = i.add(ONE); }; new Thread(r).start(); } );
И в конце:
Ну и конечно же книга которая помогла мне и продолжает помогать в изучении реактивного подхода к разработки, это — «Реактивное программирование с применением RxJava» от Томаш Нуркевича и Бена Кристенсена.
Блин, ну как вы так читали-то, там же объясняется, что 1) Создавать observables через create() — дурная затея, так как почти все (и вы в том числе) забывают про обработку backpressure; 2) Запускать треды внутри Observable.create() — дурно пахнущий код, для работы с потоками subscribeOn()/observeOn() есть.Implozia Автор
20.09.2017 23:57-1subscrubeOn() На сколько я знаю очень удобен для использования пула потоков, а в моем примере всего один маленький тредик, в подобных примерах и авторы книги применяют такой подход
gildor
21.09.2017 08:13Чем же subscrubeOn() не удобен при использовании одного потока? Это стандартный механизм и использование голого треда тут ничем не оправдано
mayorovp
21.09.2017 08:39Конкретно в данном случае он неудобен тем что занимает на неопределенное время один поток из пула. Стандартные пулы потоков попросту не для такого сценария использования делались.
gildor
21.09.2017 08:43Так можно создать свой Scheduler из одного потока, который ничего из пула красть не будет, как и io Schedulers.io(), который не имеет ограничения на число потоков и просто создаст новый в случае необходимости.
Ну я с вами соглашусь, что RxJava точно не для такого делалась, как пример в статьеmayorovp
21.09.2017 08:48+1Из одного потока пул создавать нельзя, потому что подписчиков может оказаться два — и тогда потребуется два потока.
Да, я почему-то думал что `subscribeOn` принимает `ExecutorService` а не `rx.Scheduler`. Конечно, `Schedulers.io()` использовать можно. Хотя мне для такого странного кода больше нравится `Schedulers.newThread()` :-)
artemgapchenko
21.09.2017 08:29+1Всего один маленький тредик, которой можно было бы и не создавать, если взять его из уже созданного и правильным образом сконфигурированного пула потоков.
Implozia Автор
20.09.2017 23:54-3Ребят, я всех услышал, вот здесь я максимально подробно написал, что я пытался донести до вас этой статьей: habrahabr.ru/post/338372
Возможно мы друг друга не поняли, если и тут я не прав, напишите, приму к сведению в дальнейшем, и буду планировать следующую статью, если, конечно будет что написать, максимально подробно. Прошу прощения, если я в ней не прав, но пригорело у меня знатно
gildor
21.09.2017 08:06Тут уже упомянался Observable.create() и то что это дурной тон и контр аргумент, что он не задепрекейчен больше и является рекомендованы.
Хотел просто внести ясность, что есть deprecated Observable.create(), его не deprecated версия называется Observable.unsafeCreate().
Рекомендованый билдер так же называется Observable.create(), но использует совершенно другую сигнатуру и является просто переименованой версией того, что раньше называлось Observable.fromEmitter() и в данный момент deprecated в rxjava 1.x
ulman95
Зря конечно слово «велосипед» зачеркнули, к примерам кода отлично подходит. В первоиздании
ulman95
… в первоиздании запуск потока внутри create() не являлся эталонным решением.