В Android-комьюнити я встречал три типа разработчиков, которые сталкивались с RxRelay:

  1. Те, кто не понимают зачем RxRelay используется в их проекте, зачем он нужен и чем отличается от Subject
  2. Те, кто думают, что RxRelay «проглатывает» ошибки или «после того, как произошла ошибка RxRelay, продолжит работать, а Subject — нет» (та самая магия)
  3. Те, кто действительно знает, что такое RxRelay.

Пока первые два типа встречаются чаще, я решил написать статью, которая поможет разобраться в том, как работает RxRelay и проверить его «магические» свойства.

Если вы используете RxJava, то вероятно вы пользуетесь Subject или RxRelay, чтобы прокидывать события из одной сущности в другую или делать из императивного кода реактивный.

Давайте проверим пункт №2 и посмотрим, в чем разница между RxRelay и Subject. Итак, у нас есть две подписки на один relay, при клике на кнопку мы пушим единицу в этот relay.

class MainActivity : AppCompatActivity() {
   private val relay = PublishRelay.create<Int>()
   private var isError: Boolean = false

   override fun onCreate(savedInstanceState: Bundle?) {
       super.onCreate(savedInstanceState)
       setContentView(R.layout.activity_main)

       val disposable1 = relay
           .map {
               if (isError) {
                   isError = false
                   throw Exception()
               } else {
                   isError = true
               }
           }.subscribe(
               {
                   Log.d("test", "Цепочка с ошибкой: onNext")
               },
               {
                   Log.d("test", "Цепочка с ошибкой: onError")
               }
           )

       val disposable2 = relay
           .subscribe(
               {
                   Log.d("test", "Цепочка без ошибки: onNext")
               },
               {
                   Log.d("test", "Цепочка без ошибки: onError")
               }
           )

       btn.setOnClickListener {
           relay.accept(1)
       }
   }
}

Три раза подряд кликаем на кнопку и видим вот такой лог.
D/test: Цепочка с ошибкой: onNext
D/test: Цепочка без ошибки: onNext

D/test: Цепочка с ошибкой: onError
D/test: Цепочка без ошибки: onNext

D/test: Цепочка без ошибки: onNext

Если заменить переменную RxRelay на PublishSubject, лог не изменится. И вот почему:

При первом клике мы пушим в наш relay данные. Оба подписчика срабатывают.

При втором клике в цепочке у первого подписчика (disposable1) возникает ошибка.

При третьем клике первый disposable1 уже не срабатывает, так как он получил терминальное состояние onError. Дальше будет работать только второй disposable2.

Так будет и с Subject, и с RxRelay. Напомню, что в rx ошибки идут вниз по цепочке к подписчику (downstream) и выше места, где они возникли, не попадают. В итоге мы проверили, что цепочка на основе RxRelay не может работать после того, как возникла ошибка.

Так если разницы в поведении Subject и RxRelay нет, то в чем их отличие?

Вот что пишет сам разработчик в README на гитхабе:
“Basically: A Subject except without the ability to call onComplete or onError.”

То есть это просто Subject без методов onComplete и onError, даже исходный код классов почти одинаковый. Если мы вызовем на Subject эти методы, то он перестанет работать, так как получит терминальное состояние. Поэтому автор библиотеки решил, что стоит убрать эти методы, потому что те разработчики, которые не знают об этом свойстве Subject могут случайно вызвать их.

Вывод: единственное отличие RxRelay от Subject — это отсутствие двух методов onComplete и onError, чтобы разработчик не мог вызвать терминальный стейт.