Hola, Amigos! На связи Павел Гершевич, Mobile Team Lead агентства продуктовой разработки Amiga и соавтор телеграм-канала Flutter.Много. Как известно, BLoC — один из самых популярных способов для управления состоянием. Его преимущество в том, что мы можем управлять не только самим состоянием, но и теми данными, которые в него попадают.

В этой статье мы разберемся с такими вопросами:
  1. Что такое Event Transformers? Для чего они нужны?
  2. Как их применять?

И попробуем сделать 2 кастомных трансформера различной сложности.

Немного теории


Сначала посмотрим на то, как устроен BLoC:



Кратко это можно описать так — BLoC состоит из двух потоков данных, которые объединяются при помощи обработки событий из одного и добавления данных в другой. Эти потоки нужны для событий BLoC и его состояния.

Чтобы контролировать поток состояний, нам нужно контролировать поток событий и их обработку. Если с обработкой нет ничего сложного, так как это обычные функции, то вот с потоком событий могут возникнуть сложности.
В нем мы можем управлять порядком, в котором будут вызываться функции для обработки, и тем самым знать, когда изменится состояние. Чтобы это делать, существуют специальные функции для преобразования — трансформеры.

Как устроен трансформер?


Если говорить коротко, то функция, которую мы применяем, возвращает еще одну, которая как раз и обрабатывает поток.

(events, mapper) => events.map((e) => mapper(e));

На вход она получает поток событий и функцию для обработки данных. Уже внутри нее идет преобразование.

Как добавить трансформер к событиям?


Event Transformer можно применить как ко всем событиям в BLoC, так и только к определенным, передав функцию в метод on из BLoC:
on<MyEvent>(
  (event, emit) {
    …
  },
  transformer: sequential(),
);

А есть ли готовые трансформеры?


Разработчики библиотеки bloc предоставляют дополнительную библиотеку bloc_concurrency, в которую входят 4 уже готовых трансформера. Давайте посмотрим на них по порядку.



Первый — concurrent. Он нужен для обработки событий в тот момент, когда они поступили.

Второй — sequential. Он позволяет выполнять обработку событий последовательно.

Например, при бесконечном скролле, пока мы загружаем одну страницу, вторая страница не будет обрабатываться.

Третий — droppable. С его помощью можно отменить выполнение других событий, пока одно обрабатывается.

Например, кнопка для сохранения формы. Если пользователь нажал ее несколько раз, то обработано будет только первое нажатие.

Четвертый — restartable. Он позволяет отменить обработку текущего события, если пришло новое.

Например, для тяжеловесных операций, которые могут быть перезапущены для экономии ресурсов, или для поиска. Пользователь вводит 10 символов один за другим, а мы отбрасываем, пока он не прекратит ввод.

Пишем кастомные трансформеры


Иногда таких трансформеров может не хватать, особенно, если нам нужно сделать какие-то более сложные вещи. Например, добавить debounce к какому-либо событию.

Создаем трансформер в виде функции


Давайте попробуем написать такой трансформер сами. Для этого уже есть подготовленный тип данных — EventTransformer, но не нужно забывать, что он принимает в себя какой-либо класс события, поэтому лучше добавлять в него дженерик. И для простоты работы мы дополнительно будем передавать длительность уже внутрь трансформера.

EventTransformer<T> debounced<T>({required Duration duration}) {
  return (events, mapper) => events
      .debounce((_) => TimerStream(true, duration))
      .map(mapper);
}

Итого, используя метод debounce из библиотеки rxdart, мы создали свой первый простой EventTransformer, который можем применить в нашем BLoC.

on<MyEvent>(
  (event, emit) {
    …
  },
  transformer: debounced(
    duration: const Duration(milliseconds: 500),
  ),
);

Создаем более сложный трансформер


Таких простых трансформеров может быть все равно недостаточно. Например, если нам нужно сделать буферизацию событий с изначальной задержкой. Давайте рассмотрим именно такой случай.



Допустим, у нас есть вот такое событие:

class AddToCart extends CartEvent {
  final Item item;
  …
}

Сначала нужно вернуть то же событие, которое поступает на вход. Для этого преобразуем его для работы со списком.

class AddToCart extends CartEvent {
  final List<Item> items;
  …
}

И создадим метод, который будет преобразовывать несколько событий в одно:

AddToCart fromBuffer(List<AddToCart> events) {
  return AddToCart(
    items: events
        .map((e) => e.items)
        .expand()
        .toList(),
  );
}

Такие методы могут быть более сложными, но они обязательно должны принимать список событий и возвращать всего одно, причем того же типа.

Можно добавить тип данных под это:

typedef EventFlatMapper<T> = T Function(List<T>);

Теперь у нас все готово, и мы продолжаем с создания дополнительного класса, в котором и будем обрабатывать наш поток событий. Для этого в Dart уже есть специальный абстрактный класс — StreamTransformerBase.

class _BufferedEventTransformer<T> extends StreamTransformerBase<T, T> {
  final EventMapper<T> mapper;
  final EventFlatMapper<T> flatMapper;
  final Duration duration;

  _BufferedEventTransformer({
    required this.mapper,
    required this.flatMapper,
    required this.duration,
  });

  …
}

В наш класс мы передаем mapper, который обрабатывает наше событие из BLoC, функцию для раскладывания flatMapper и длительность минимальной задержки duration.

А как же нам все-таки обрабатывать поток данных? Тут все просто — есть метод bind, от которого мы и оттолкнемся.

@override
Stream<T> bind(Stream<T> stream) {}

По идее, тут мы можем делать абсолютно то же самое, что позволяют простые трансформеры. Но мы пишем что-то более сложное.

Оглянемся на те трансформеры, которые есть в bloc_concurrency, а именно на droppable. Давайте посмотрим, как там организован метод bind:

@override
Stream<T> bind(Stream<T> stream) {
  late StreamSubscription<T> subscription;
  StreamSubscription<T>? mappedSubscription;

  final controller = StreamController<T>(
    onCancel: () async {
      await mappedSubscription?.cancel();
      return subscription.cancel();
    },
    sync: true,
  );

  subscription = stream.listen(
    (data) {
      if (mappedSubscription != null) return;
      final Stream<T> mappedStream;

      mappedStream = mapper(data);
      mappedSubscription = mappedStream.listen(
        controller.add,
        onError: controller.addError,
        onDone: () => mappedSubscription = null, 
      );
    },
    onError: controller.addError,
    onDone: () => mappedSubscription ?? controller.close(),
  );

  return controller.stream;
}

Можно увидеть, создается подписка на основной поток данных, а внутри нее сначала проверяется, завершилась ли обработка события — если нет, то следующее отбрасывается. Это очень похоже на то, что необходимо при буферизации. Поэтому работать будем с этим кодом, а изменим только подписку на основной Stream.

Первое, что нужно сделать — добавить задержку перед обработкой первого события. Она же будет и максимальной задержкой между обработкой событий. Сделаем это при помощи rxdart: буферизуем по времени, обрабатываем нашим flatMapper и отбрасываем пустые события.

subscription = stream
  .bufferTime(duration)
  .map<T?>((e) {
    if (e.isEmpty) return null;
    return flatMapper(e);
  })
  .whereType<T>()
  .listen(
    (data) {
      …
    },
    onError: controller.addError,
    onDone: () => mappedSubscription ?? controller.close(),
  );

Теперь все события, которые добавляются за время задержки, будут обработаны вместе. Но у нас до сих пор отбрасываются события, а не улетают в буфер. Для этого мы перенесем проверку выше в bufferTest, обработаем flatMapper и отбросим пустые события.

subscription = stream
  .bufferTime(duration)
  .map<T?>((e) {
    if (e.isEmpty) return null;
    return flatMapper(e);
  })
  .whereType<T>()
  .bufferTest((_) => mappedSubscription != null)
  .map<T?>((e) {
    if (e.isEmpty) return null;
    return flatMapper(e);
  })
  .whereType<T>()
  .listen(
    (data) {
      …
    },
    onError: controller.addError,
    onDone: () => mappedSubscription ?? controller.close(),
  );

Единственное, что осталось — убрать проверку на то, что mappedSubscription не закончил свою работу из прослушивания. И все, наш трансформер готов к тому, чтобы по нему сделать метод для добавления в BLoC:

EventTransformer<T> buffered<T>(
  EventFlatMapper<T> flatMapper,
  Duration duration,
) {
  return (events, mapper) => events.transform(
    _BufferedEventTransformer(
      mapper: mapper,
      flatMapper: flatMapper,
      duration: duration,
    ),
  );
}

Теперь мы можем добавлять буферизацию к нашим событиям, чтобы ускорить выполнение некоторых ресурсоемких операций.

Если есть вопросы — буду ждать вас в комментариях. А еще подписывайтесь на телеграм-канал про Flutter-разработку — я часто там пишу и делюсь полезными инсайтами и новостями из мира мобильной разработки.

Комментарии (1)


  1. Odyss
    24.12.2024 11:38

    Спасибо, было интересно! У нас как раз похожие проблемы с последовательными асинхронными ивентами, будем учитывать что для этого есть трансформеры.