Привет, Хабр. Представляю вашему вниманию перевод статьи Understanding Publish, Connect, RefCount and Share in RxSwift.

В оригинале статьи используется Swift второй версии и соответствующая версия RxSwift. Я имел смелость переписать приведенные ниже куски кода под Swift 3.
Так же хочется отметить, что такие понятия, как Observable и Sequence, можно считать одним и тем же. То же касается Observer и Subscriber.

Так же рекомендую почитать про share(), share Replay(), shareReplayLatestWhileConnected() в RxSwift.


В этой статье я постараюсь объяснить такие операторы для работы с Connectable Observable в RxSwift, как publish, connect, refCount и share. Они используются вместе в различных комбинациях. Очень важно понимать разницу между:

  • publish().connect()
  • и publish().refcount() (или просто share())

Активные и пассивные Observables


Прежде чем перейти к сути, мне хотелось бы сказать пару слов о hot и cold Observables. Как по мне, так понятия горячих и холодных Observables немного размыты.

Давайте горячий Observable мы будем называть Active Sequence, а холодный Passive Sequence.

  • Active Sequence эмитит элементы постоянно, независимо от того, подписан на нее кто-нибудь или нет
  • Passive Sequence начинает эмитить элементы по запросу

Примером Passive Sequence может служить запрос в сеть, который начинается только тогда, когда мы подписались на последовательность. Примерами Active Sequence могут служить web-socket соединение, события таймера или текст, производимый UITextField'ом.

И это все. Думайте об активных и пассивных последовательностях. Понятия горячих/холодных/теплых/прохладных Observables слишком запутанны и могут сбить с толку.

Несколько подписок на один Observable


Если Вы когда-нибудь подписывались дважды (или больше) на один и тот же Observable, Вы могли бы быть удивлены результатами.

Взгляните на следующий кусочек кода:

let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
    .rx.data(request: URLRequest(url: url))

requestObservable.subscribe(onNext: {
    print($0)
})

requestObservable.subscribe(onNext: {
    print($0)
})

Взглянув в консоль, мы увидим два HTTP респонса. Observable выполнил запрос дважды, хоть это противоречит нашим ожиданиям.



share() как спасение


Очевидно, что это не то, чего мы хотим от обычного HTTP-реквеста. Но мы можем изменить такое поведение и выполнить всего один запрос. Надо просто применить оператор share() к нашему Observable.

let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
    .rx.data(request: URLRequest(url: url))
    .share()

requestObservable.subscribe(onNext: {
    print($0)
})

requestObservable.subscribe(onNext: {
    print($0)
})

Как и ожидалось, выполнился только один HTTP-запрос.


По сути, оператор share() — это просто обертка над publish().refcount().
Стоп-стоп-стоп! Что еще за publish(), что за refcount()?

publish() и его друг connect()


Тогда, когда применен оператор publish(), то Observable трансформируется в Connectable Observable. В документации ReactiveX говорится:
Connectable Observable похож на обычный Observable за исключением одного момента. Он начинает производить элементы не тогда, когда на него подписываются, а только тогда, когда на нем вызван оператор connect().
let myObservable = Observable.just(1).publish()

print("Subscribing")
myObservable.subscribe(onNext: {
    print("first = \($0)")
})

myObservable.subscribe(onNext: {
    print("second = \($0)")
})

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    print("Calling connect after 3 seconds")
    myObservable.connect()
}

/* Output:
Subscribing
Calling connect after 3 seconds
first = 1
second = 1
*/

В приведенном выше примере, Observer'ы подписываются на myObservable сразу после того, как он был создан. Но срабатывают они только через 3 секунды, когда был вызван оператор connect(). Проще говоря, connect() активирует Connectable Observable и включает подписчиков.

Интересная штука в том, как происходит очистка ресурсов. Посмотрите на этот код.

let myObservable = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .publish()
myObservable.connect()
print("Starting at 0 seconds")

let mySubscription = myObservable.subscribe(onNext: {
    print("Next: \($0)")
})

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    print("Disposing at 3 seconds")
    mySubscription.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 6)  {
    print("Subscribing again at 6 seconds")
    myObservable.subscribe(onNext: {
        print("Next: \($0)")
    })
}

// Output:
/* 
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 6
Next: 7
Next: 8
Next: 9
...
*/

Даже если все подписчики отписались от нашего Observable, то последний все еще живет и продолжает производить события под капотом.

Примечание переводчика
Метод connect() возвращает Disposable. Таким образом, остановить продюсинг элементов можно, вызвав метод dispose() у данного Disposable, либо предоставить эту возможность DisposeBag'у.

Теперь давайте сравним это с publish().refcount().

Разница между publish().connect() и publish().refcount()


Вы можете воспринимать оператор refcount() как магию, которая за Вас обрабатывают отписку Observer'ов. refcount() вызывает connect() автоматически, когда подписывается первый Observer, так что нет нужды делать это самостоятельно.

let myObservable = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .publish()
    .refCount()
print("Starting at 0 seconds")

let mySubscription = myObservable.subscribe(onNext: {
    print("Next: \($0)")
})

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    print("Disposing at 3 seconds")
    mySubscription.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 6)  {
    print("Subscribing again at 6 seconds")
    myObservable.subscribe(onNext: {
        print("Next: \($0)")
    })
}
// Output:
/* 
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 0
Next: 1
Next: 2
Next: 3
...
*/

Обратите внимание вот на что. Когда мы подписались заново, Observable начал эмитить элементы с начала.

Заключение


Чувствуете разницу теперь? publish().connect() и publish().refcount() (или share()) управляют механизмом отписки от Obervable'ов по-разному.

Когда Вы используете publish().connect(), Вам необходимо вручную управлять механизмом очистки ресурсов вашего Observable (об этом говорилось в примечании под спойлером). Ваша последовательность ведет себя как активная и производит элементы все время, независимо от подписок.

С другой стороны, publish().refcount()/share() следит за том, как много Observer'ов подписано на Observable и не отключает первых от последнего до тех пор, пока существует хотя бы один подписчик. Другими словами, когда счетчик подписчиков падает до нуля, Observable «умирает» и перестает производить какие-либо элементы.

Если что-то осталось не до конца ясным, пожалуйста, дайте знать об этом в комментариях. Спасибо.

Комментарии (4)


  1. b_oberon
    29.08.2017 19:12

    Весьма хорошее введение в ReactiveX даёт https://youtu.be/3LKMwkuK0ZE, хотя и на примере RxJS.


  1. iWheelBuy
    30.08.2017 06:37

    Не хватило упоминания о других типах share (например, shareReplayWhileConnected). А так интересно было почитать, спасибо за перевод.


    1. dolenko_alexey Автор
      30.08.2017 11:28

      Есть классная статья на медиуме об этом, как-нибудь тоже переведу.


    1. dolenko_alexey Автор
      02.09.2017 14:10