Привет, Хабр. Представляю вашему вниманию перевод статьи Understanding Publish, Connect, RefCount and Share in RxSwift.
В оригинале статьи используется Swift второй версии и соответствующая версия RxSwift. Я имел смелость переписать приведенные ниже куски кода под Swift 3.
Так же хочется отметить, что такие понятия, как Observable и Sequence, можно считать одним и тем же. То же касается Observer и Subscriber.
Так же рекомендую почитать про share(), share Replay(), shareReplayLatestWhileConnected() в RxSwift.
В этой статье я постараюсь объяснить такие операторы для работы с Connectable Observable в RxSwift, как
Прежде чем перейти к сути, мне хотелось бы сказать пару слов о hot и cold Observables. Как по мне, так понятия горячих и холодных Observables немного размыты.
Давайте горячий Observable мы будем называть Active Sequence, а холодный Passive Sequence.
Примером Passive Sequence может служить запрос в сеть, который начинается только тогда, когда мы подписались на последовательность. Примерами Active Sequence могут служить web-socket соединение, события таймера или текст, производимый
И это все. Думайте об активных и пассивных последовательностях. Понятия горячих/холодных/теплых/прохладных Observables слишком запутанны и могут сбить с толку.
Если Вы когда-нибудь подписывались дважды (или больше) на один и тот же Observable, Вы могли бы быть удивлены результатами.
Взгляните на следующий кусочек кода:
Взглянув в консоль, мы увидим два HTTP респонса. Observable выполнил запрос дважды, хоть это противоречит нашим ожиданиям.
Очевидно, что это не то, чего мы хотим от обычного HTTP-реквеста. Но мы можем изменить такое поведение и выполнить всего один запрос. Надо просто применить оператор
Как и ожидалось, выполнился только один HTTP-запрос.
По сути, оператор
Стоп-стоп-стоп! Что еще за
Тогда, когда применен оператор publish(), то Observable трансформируется в Connectable Observable. В документации ReactiveX говорится:
В приведенном выше примере, Observer'ы подписываются на
Интересная штука в том, как происходит очистка ресурсов. Посмотрите на этот код.
Даже если все подписчики отписались от нашего Observable, то последний все еще живет и продолжает производить события под капотом.
Теперь давайте сравним это с
Разница между
Вы можете воспринимать оператор
Обратите внимание вот на что. Когда мы подписались заново, Observable начал эмитить элементы с начала.
Чувствуете разницу теперь?
Когда Вы используете
С другой стороны,
Если что-то осталось не до конца ясным, пожалуйста, дайте знать об этом в комментариях. Спасибо.
В оригинале статьи используется Swift второй версии и соответствующая версия RxSwift. Я имел смелость переписать приведенные ниже куски кода под Swift 3.
Так же хочется отметить, что такие понятия, как Observable и Sequence, можно считать одним и тем же. То же касается Observer и Subscriber.
Так же рекомендую почитать про share(), share Replay(), shareReplayLatestWhileConnected() в RxSwift.
В этой статье я постараюсь объяснить такие операторы для работы с Connectable Observable в RxSwift, как
publish, connect, refCount
и share
. Они используются вместе в различных комбинациях. Очень важно понимать разницу между:publish().connect()
- и
publish().refcount()
(или простоshare()
)
Активные и пассивные Observables
Прежде чем перейти к сути, мне хотелось бы сказать пару слов о hot и cold Observables. Как по мне, так понятия горячих и холодных Observables немного размыты.
Давайте горячий Observable мы будем называть Active Sequence, а холодный Passive Sequence.
- Active Sequence эмитит элементы постоянно, независимо от того, подписан на нее кто-нибудь или нет
- Passive Sequence начинает эмитить элементы по запросу
Примером Passive Sequence может служить запрос в сеть, который начинается только тогда, когда мы подписались на последовательность. Примерами Active Sequence могут служить web-socket соединение, события таймера или текст, производимый
UITextField
'ом.И это все. Думайте об активных и пассивных последовательностях. Понятия горячих/холодных/теплых/прохладных Observables слишком запутанны и могут сбить с толку.
Несколько подписок на один Observable
Если Вы когда-нибудь подписывались дважды (или больше) на один и тот же Observable, Вы могли бы быть удивлены результатами.
Взгляните на следующий кусочек кода:
let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
.rx.data(request: URLRequest(url: url))
requestObservable.subscribe(onNext: {
print($0)
})
requestObservable.subscribe(onNext: {
print($0)
})
Взглянув в консоль, мы увидим два HTTP респонса. Observable выполнил запрос дважды, хоть это противоречит нашим ожиданиям.
share()
как спасение
Очевидно, что это не то, чего мы хотим от обычного HTTP-реквеста. Но мы можем изменить такое поведение и выполнить всего один запрос. Надо просто применить оператор
share()
к нашему Observable.let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
.rx.data(request: URLRequest(url: url))
.share()
requestObservable.subscribe(onNext: {
print($0)
})
requestObservable.subscribe(onNext: {
print($0)
})
Как и ожидалось, выполнился только один HTTP-запрос.
По сути, оператор
share()
— это просто обертка над publish().refcount()
.Стоп-стоп-стоп! Что еще за
publish()
, что за refcount()
?publish()
и его друг connect()
Тогда, когда применен оператор publish(), то Observable трансформируется в Connectable Observable. В документации ReactiveX говорится:
Connectable Observable похож на обычный Observable за исключением одного момента. Он начинает производить элементы не тогда, когда на него подписываются, а только тогда, когда на нем вызван оператор connect()
.
let myObservable = Observable.just(1).publish()
print("Subscribing")
myObservable.subscribe(onNext: {
print("first = \($0)")
})
myObservable.subscribe(onNext: {
print("second = \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Calling connect after 3 seconds")
myObservable.connect()
}
/* Output:
Subscribing
Calling connect after 3 seconds
first = 1
second = 1
*/
В приведенном выше примере, Observer'ы подписываются на
myObservable
сразу после того, как он был создан. Но срабатывают они только через 3 секунды, когда был вызван оператор connect()
. Проще говоря, connect()
активирует Connectable Observable и включает подписчиков.Интересная штука в том, как происходит очистка ресурсов. Посмотрите на этот код.
let myObservable = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.publish()
myObservable.connect()
print("Starting at 0 seconds")
let mySubscription = myObservable.subscribe(onNext: {
print("Next: \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Disposing at 3 seconds")
mySubscription.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
print("Subscribing again at 6 seconds")
myObservable.subscribe(onNext: {
print("Next: \($0)")
})
}
// Output:
/*
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 6
Next: 7
Next: 8
Next: 9
...
*/
Даже если все подписчики отписались от нашего Observable, то последний все еще живет и продолжает производить события под капотом.
Примечание переводчика
Метод
connect()
возвращает Disposable
. Таким образом, остановить продюсинг элементов можно, вызвав метод dispose()
у данного Disposable
, либо предоставить эту возможность DisposeBag
'у.Теперь давайте сравним это с
publish().refcount()
.Разница между publish().connect()
и publish().refcount()
Вы можете воспринимать оператор
refcount()
как магию, которая за Вас обрабатывают отписку Observer'ов. refcount()
вызывает connect()
автоматически, когда подписывается первый Observer, так что нет нужды делать это самостоятельно.let myObservable = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.publish()
.refCount()
print("Starting at 0 seconds")
let mySubscription = myObservable.subscribe(onNext: {
print("Next: \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Disposing at 3 seconds")
mySubscription.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
print("Subscribing again at 6 seconds")
myObservable.subscribe(onNext: {
print("Next: \($0)")
})
}
// Output:
/*
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 0
Next: 1
Next: 2
Next: 3
...
*/
Обратите внимание вот на что. Когда мы подписались заново, Observable начал эмитить элементы с начала.
Заключение
Чувствуете разницу теперь?
publish().connect()
и publish().refcount()
(или share()
) управляют механизмом отписки от Obervable'ов по-разному.Когда Вы используете
publish().connect()
, Вам необходимо вручную управлять механизмом очистки ресурсов вашего Observable (об этом говорилось в примечании под спойлером). Ваша последовательность ведет себя как активная и производит элементы все время, независимо от подписок.С другой стороны,
publish().refcount()/share()
следит за том, как много Observer'ов подписано на Observable и не отключает первых от последнего до тех пор, пока существует хотя бы один подписчик. Другими словами, когда счетчик подписчиков падает до нуля, Observable «умирает» и перестает производить какие-либо элементы.Если что-то осталось не до конца ясным, пожалуйста, дайте знать об этом в комментариях. Спасибо.
Комментарии (4)
iWheelBuy
30.08.2017 06:37Не хватило упоминания о других типах share (например, shareReplayWhileConnected). А так интересно было почитать, спасибо за перевод.
dolenko_alexey Автор
30.08.2017 11:28Есть классная статья на медиуме об этом, как-нибудь тоже переведу.
b_oberon
Весьма хорошее введение в ReactiveX даёт https://youtu.be/3LKMwkuK0ZE, хотя и на примере RxJS.