Dart в совокупности с пакетом Async обладает неплохим функционалом в части работы со стримами. Однако ограничения всё ещё есть. Чтобы сделать стримы более удобными, используется пакет RxDart.

ReactiveX (Rx) появился в 2010 году для .NET, а после был портирован почти на все  современные языки программирования и стал стандартом. Версию для Dart опубликовали в 2015 году, и на данный момент она входит в число Flutter Favorite пакетов — её максимально поддерживает комьюнити.

Меня зовут Виталий, я Flutter Team Lead в Surf, и эта небольшая статья станет первой в цикле публикаций на тему RxDart.

Тезисы

  • Subject — объект, на который можно подписаться и слушать переданные в него значения, аналог StreamController в Dart.

  • PublishSubjectSubject, который является аналогом стандартного широковещательного контроллера StreamController.broadcast().

  • ReplaySubjectSubject, который хранит все переданные ранее значения и при подписке возвращает сразу все прошлые значения.

  • BehaviorSubjectSubject, который хранит в себе последнеепереданное значение, и при подписке на этот Subject, сразу возвращает слушателю это значение. Может быть инициализирован только с начальным значением.

Что предлагает Dart из «коробки»

Из «коробки» Dart предоставляет для работы со Stream класс StreamController, который позволяет управлять стримами. 

Существует два вида подписки:

  • single subscription — может быть только один слушатель, который гарантированно получит все сообщения, поступившие после подписки на стрим;

  • broadcast — слушателей может быть много, но они также будут получать сообщения, которые попадают в стрим после подписки.

Для StreamController.broadcast() можно провести аналогию с радио — активным слушателям информация поставляется в «прямом эфире», и если подключиться к нему позже остальных, то послушать упущенное никак нельзя.

import 'dart:async';

void _firstListener(int value) => print('first: $value');
void _secondListener(int value) => print('second: $value');

void main() {
  // создаем контроллер
  final streamController = StreamController<int>.broadcast();
  
  // добавляем первый слушатель, перед добавлением значения
  streamController.stream.listen(_firstListener);
  
  // добавляем значение
  streamController.add(1);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  streamController.stream.listen(_secondListener);
  // ничего не выведется
}

Что предлагает RxDart

Пакет добавляет три специализированных контроллера для работы со Stream — разновидности Subject

  • PublishSubject

  • ReplaySubject

  • BehaviorSubject

Рассмотрим их по отдельности.

PublishSubject

PublishSubject — широковещательный («broadcast» или «hot») контроллер, аналог стандартного широковещательного контроллера StreamController.broadcast(), о котором писали выше. Останавливаться здесь не будем. 

ReplaySubject

ReplaySubjectтакже широковещательный контроллер, который стоит использовать, если слушателю нужно передать все прошлые переданные события. Для новых слушателей он «проигрывает» все прошлые события начиная с первого. 

Аналог  — прямой эфир трансляции, который можно перематывать на ранние моменты.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём replay subject
  final replaySubject = ReplaySubject<int>();
  
  replaySubject..add(1)..add(2)..add(3)..add(4);
  
  // добавляем первый слушатель, перед добавлением значения
  replaySubject.stream.listen(_firstListener);
  // _firstListener выведет:
  // first: 1 
  // first: 2 
  // first: 3 
  // first: 4
  
  // добавляем второй слушатель
  replaySubject.stream.listen(_secondListener);
  // _secondListener выведет:
  // second: 1 
  // second: 2 
  // second: 3 
  // second: 4
}

BehaviorSubject

BehaviorSubject — широковещательный контроллер, который хранит в себе последнее значение или ошибку, и при подписке на этот стрим сразу возвращает слушателю последнее событие, переданное в контроллер. 

Аналог — прямая трансляция без возможности перемотки. Когда подключаешься, начинаешь просмотр с самого последнего кадра.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём behavior subject
  final behaviorSubject = BehaviorSubject<int>();
  
  // добавляем первый слушатель, перед добавлением значения
  behaviorSubject.stream.listen(_firstListener);
  
  // добавляем значение
  behaviorSubject.add(1);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  behaviorSubject.stream.listen(_secondListener);
  // _secondListener выведет 'second: 1'
}

Опционально можно передать слушателю исходное значение при подписке — с помощью конструктора BehaviorSubject<T>.seeded, для Rx это более «нативный» способ объявления BehaviorSubject.

import 'package:rxdart/subjects.dart';

// ... _firstListener и _secondListener

void main() {
  // создаём behavior subject
  final behaviorSubject = BehaviorSubject<int>.seeded(1);
  
  // добавляем первый слушатель, перед добавлением значения
  behaviorSubject.stream.listen(_firstListener);
  // _firstListener выведет 'first: 1'
  
  // добавляем второй слушатель
  behaviorSubject.stream.listen(_secondListener);
  // _secondListener выведет 'second: 1'
}

BehaviorSubject<T>.seeded можно использовать, когда в стрим требуется передать исходное значение и слушателям необходимо «среагировать» на него.

Например, состояние корзины с товарами хранить в BehaviorSubject, а на экране корзины связать вывод её содержимого напрямую с состоянием в условном классе CartService.

import 'package:rxdart/subjects.dart';

class Product {
  final String title;

  const Product(this.title);
}

class CartState {
  final List<Product> products;

  const CartState({required this.products});
	
  factory CartState.empty() => const CartState(products: []);
}

class CartService {
  final _cartState = BehaviorSubject<CartState>.seeded(CartState.empty());

  Stream<CartState> get cartStateStreamed => _cartState.stream;

  void addProduct(Product product) {
    _cartState.add(
      CartState(
        products: [
          ..._cartState.value.products,
          product,
        ],
      ),
    );
  }
}

// где-то в приложении объявляем сервис для работы с корзиной
final service = CartService();

// подписываемся на состояние корзины, для обновления счётчика товаров, 
// например в BottomAppBar, он будет обновляться при изменении состояния корзины
service.cartStateStreamed.listen((cartState) {
  print('Число товаров: ${cartState.products.length}');
});

// добавляем товары
service..addProduct(const Product("Капуста"))..addProduct(const Product("Картошка"));

// чуть позже на экране содержимого корзины подписываемся на состояние и выводим названия товаров
service.cartStateStreamed.listen((cartState) {
  print('Названия товаров: ${cartState.products.map((p) => p.title).join(',')}');
});

В некоторых случаях BehaviorSubject может заменить ValueNotifier, который не оповещает слушателей о своём последнем  значении при подписке. BehaviorSubject позволяет аналогично обратиться к последнему значению стрима через геттер BehaviorSubject.value.

Заключение

Если вы хотите применять пакет RxDart в своих проектах и делать их более эффективными, не забывайте про документацию.

Также стоит ознакомиться с документацией для RxJS — пакета для JavaScript, актуального и для RxDart, делая скидку на отличный от стека Flutter язык. В этом пакете классная визуализация принципов Rx, так как Rx пакеты соблюдают общий контракт для всех методов и классов.

На этом не прощаемся — продолжим писать по теме и искать возможности повышения эффективности вашей работы.

Больше полезного про Flutter — в Telegram-канале Surf Flutter Team. Кейсы, лучшие практики, новости и вакансии в команду Flutter Surf в одном месте. Присоединяйтесь.

Комментарии (0)