Dart в совокупности с пакетом Async обладает неплохим функционалом в части работы со стримами. Однако ограничения всё ещё есть. Чтобы сделать стримы более удобными, используется пакет RxDart.
ReactiveX (Rx) появился в 2010 году для .NET, а после был портирован почти на все современные языки программирования и стал стандартом. Версию для Dart опубликовали в 2015 году, и на данный момент она входит в число Flutter Favorite пакетов — её максимально поддерживает комьюнити.
Меня зовут Виталий, я Flutter Team Lead в Surf, и эта небольшая статья станет первой в цикле публикаций на тему RxDart.
Тезисы
Subject
— объект, на который можно подписаться и слушать переданные в него значения, аналогStreamController
в Dart.PublishSubject
—Subject
, который является аналогом стандартного широковещательного контроллераStreamController.broadcast()
.ReplaySubject
—Subject
, который хранит все переданные ранее значения и при подписке возвращает сразу все прошлые значения.BehaviorSubject
—Subject
, который хранит в себе последнеепереданное значение, и при подписке на этотSubject
, сразу возвращает слушателю это значение. Может быть инициализирован только с начальным значением.
Что предлагает Dart из «коробки»
Из «коробки» Dart предоставляет для работы со Stream класс StreamController
, который позволяет управлять стримами.
Существует два вида подписки:
single subscription — может быть только один слушатель, который гарантированно получит все сообщения, поступившие после подписки на стрим;
broadcast — слушателей может быть много, но они также будут получать сообщения, которые попадают в стрим после подписки.
Для StreamController.broadcast()
можно провести аналогию с радио — активным слушателям информация поставляется в «прямом эфире», и если подключиться к нему позже остальных, то послушать упущенное никак нельзя.
import 'dart:async';
void _firstListener(int value) => print('first: $value');
void _secondListener(int value) => print('second: $value');
void main() {
// создаем контроллер
final streamController = StreamController<int>.broadcast();
// добавляем первый слушатель, перед добавлением значения
streamController.stream.listen(_firstListener);
// добавляем значение
streamController.add(1);
// _firstListener выведет 'first: 1'
// добавляем второй слушатель
streamController.stream.listen(_secondListener);
// ничего не выведется
}
Что предлагает RxDart
Пакет добавляет три специализированных контроллера для работы со Stream — разновидности Subject
:
PublishSubject
ReplaySubject
BehaviorSubject
Рассмотрим их по отдельности.
PublishSubject
PublishSubject
— широковещательный («broadcast» или «hot») контроллер, аналог стандартного широковещательного контроллера StreamController.broadcast()
, о котором писали выше. Останавливаться здесь не будем.
ReplaySubject
ReplaySubject
— также широковещательный контроллер, который стоит использовать, если слушателю нужно передать все прошлые переданные события. Для новых слушателей он «проигрывает» все прошлые события начиная с первого.
Аналог — прямой эфир трансляции, который можно перематывать на ранние моменты.
import 'package:rxdart/subjects.dart';
// ... _firstListener и _secondListener
void main() {
// создаём replay subject
final replaySubject = ReplaySubject<int>();
replaySubject..add(1)..add(2)..add(3)..add(4);
// добавляем первый слушатель, перед добавлением значения
replaySubject.stream.listen(_firstListener);
// _firstListener выведет:
// first: 1
// first: 2
// first: 3
// first: 4
// добавляем второй слушатель
replaySubject.stream.listen(_secondListener);
// _secondListener выведет:
// second: 1
// second: 2
// second: 3
// second: 4
}
BehaviorSubject
BehaviorSubject
— широковещательный контроллер, который хранит в себе последнее значение или ошибку, и при подписке на этот стрим сразу возвращает слушателю последнее событие, переданное в контроллер.
Аналог — прямая трансляция без возможности перемотки. Когда подключаешься, начинаешь просмотр с самого последнего кадра.
import 'package:rxdart/subjects.dart';
// ... _firstListener и _secondListener
void main() {
// создаём behavior subject
final behaviorSubject = BehaviorSubject<int>();
// добавляем первый слушатель, перед добавлением значения
behaviorSubject.stream.listen(_firstListener);
// добавляем значение
behaviorSubject.add(1);
// _firstListener выведет 'first: 1'
// добавляем второй слушатель
behaviorSubject.stream.listen(_secondListener);
// _secondListener выведет 'second: 1'
}
Опционально можно передать слушателю исходное значение при подписке — с помощью конструктора BehaviorSubject<T>.seeded
, для Rx это более «нативный» способ объявления BehaviorSubject
.
import 'package:rxdart/subjects.dart';
// ... _firstListener и _secondListener
void main() {
// создаём behavior subject
final behaviorSubject = BehaviorSubject<int>.seeded(1);
// добавляем первый слушатель, перед добавлением значения
behaviorSubject.stream.listen(_firstListener);
// _firstListener выведет 'first: 1'
// добавляем второй слушатель
behaviorSubject.stream.listen(_secondListener);
// _secondListener выведет 'second: 1'
}
BehaviorSubject<T>.seeded
можно использовать, когда в стрим требуется передать исходное значение и слушателям необходимо «среагировать» на него.
Например, состояние корзины с товарами хранить в BehaviorSubject
, а на экране корзины связать вывод её содержимого напрямую с состоянием в условном классе CartService
.
import 'package:rxdart/subjects.dart';
class Product {
final String title;
const Product(this.title);
}
class CartState {
final List<Product> products;
const CartState({required this.products});
factory CartState.empty() => const CartState(products: []);
}
class CartService {
final _cartState = BehaviorSubject<CartState>.seeded(CartState.empty());
Stream<CartState> get cartStateStreamed => _cartState.stream;
void addProduct(Product product) {
_cartState.add(
CartState(
products: [
..._cartState.value.products,
product,
],
),
);
}
}
// где-то в приложении объявляем сервис для работы с корзиной
final service = CartService();
// подписываемся на состояние корзины, для обновления счётчика товаров,
// например в BottomAppBar, он будет обновляться при изменении состояния корзины
service.cartStateStreamed.listen((cartState) {
print('Число товаров: ${cartState.products.length}');
});
// добавляем товары
service..addProduct(const Product("Капуста"))..addProduct(const Product("Картошка"));
// чуть позже на экране содержимого корзины подписываемся на состояние и выводим названия товаров
service.cartStateStreamed.listen((cartState) {
print('Названия товаров: ${cartState.products.map((p) => p.title).join(',')}');
});
В некоторых случаях BehaviorSubject
может заменить ValueNotifier
, который не оповещает слушателей о своём последнем значении при подписке. BehaviorSubject
позволяет аналогично обратиться к последнему значению стрима через геттер BehaviorSubject.value
.
Заключение
Если вы хотите применять пакет RxDart в своих проектах и делать их более эффективными, не забывайте про документацию.
Также стоит ознакомиться с документацией для RxJS — пакета для JavaScript, актуального и для RxDart, делая скидку на отличный от стека Flutter язык. В этом пакете классная визуализация принципов Rx, так как Rx пакеты соблюдают общий контракт для всех методов и классов.
На этом не прощаемся — продолжим писать по теме и искать возможности повышения эффективности вашей работы.
Больше полезного про Flutter — в Telegram-канале Surf Flutter Team. Кейсы, лучшие практики, новости и вакансии в команду Flutter Surf в одном месте. Присоединяйтесь.