Асинхронное программирование: потоки данных


Содержание



Что важно:


  • Потоки обеспечивают асинхронную последовательность данных.
  • Последовательности данных содержат пользовательские события и данные, считываемые из файлов.
  • Поток можно обработать с помощью await for или listen() из Stream API.
  • Потоки предоставляют способ реагирования на ошибки.
  • Существует два типа потоков: потоки-подписки (single subscription) и широковещательные (broadcast).

Асинхронное программирование в Dart характеризуется классами Future и Stream .


Future представляет собой отложенное вычисление. Если нормальная функция возвращает результат, асинхронная функция возвращает объект Future (future), который в конечном итоге будет содержать результат. future вернет результат по завершению операции.


Поток — это последовательность асинхронных событий. Это похоже на асинхронный итерируемый объект (Iterable), где вместо получения следующего события, когда вы его запрашиваете, поток сообщает о событии, когда оно будет готово.


Получение событий потока


Потоки можно создавать разными способами, что является темой другой статьи, но все они могут использоваться одним и тем же способом: асинхронный цикл for (обычно вызываемый await for) выполняет итерацию событий потока, как цикл for итерацию по коллекции. Например:


Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Этот код просто получает целочисленное событие из потока, складывает их и возвращает (future) сумму. Когда тело цикла заканчивается, функция приостанавливается до следующего события или завершения потока.


Функция помечается ключевым словом async, которое требуется при использовании цикла await for.


В следующем примере (на DartPad) выполняется проверка предыдущего кода путем создания простого потока целых чисел с помощью функции с async* (прим. генератор):


Код примера
// Copyright (c) 2015, the Dart project authors.
// Please see the AUTHORS file for details.
// All rights reserved. Use of this source code is governed
// by a BSD-style license that can be found in the LICENSE file.

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

События с ошибкой


Потоки выполнены, когда в них больше нет событий, и код, получающий события, уведомляется об этом так же, как он уведомляется о поступлении нового события. При чтении событий с помощью await for цикл заканчивается после завершения потока.


В некоторых случаях ошибка возникает до завершения потока; возможно, сбой сети при извлечении файла с удаленного сервера или код, создающий события, содержит ошибку, кто-то должен знать об этом.


Потоки могут сообщать о событии с ошибкой так же, как о событиях данных. Большинство потоков останавливаются после первой ошибки, но возможны потоки, отдающие более одной ошибки, и потоки, сообщающие о данных после события ошибки. В данном документе мы обсуждаем только потоки, которые отдают не более одной ошибки.


При чтении потока с помощью await for ошибка выдается оператором цикла. Это также завершает цикл. Вы можете поймать ошибку с помощью try-catch. В следующем примере (на DartPad) возникает ошибка, если итератор цикла равен 4:


Код примера
// Copyright (c) 2015, the Dart project authors.
// Please see the AUTHORS file for details.
// All rights reserved. Use of this source code is governed
// by a BSD-style license that can be found in the LICENSE file.

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (var value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw new Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

Работа с потоками


Класс Stream содержит ряд вспомогательных методов, которые могут выполнять общие операции над потоком, похожие на методы Iterable. Например, можно найти наименьшее положительное целое число в потоке, используя lastWhere() из Stream API.


Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

Типы потоков


Потоки-подписки


Наиболее распространенный тип потока содержит последовательность событий, которые являются частями большего целого. События должны быть доставлены в правильном порядке без пропуска любого из них. Это тип потока, который вы получаете при чтении файла или получении веб-запроса.


Такой поток можно слушать только один раз. Прослушивание позже может означать пропуск начальных событий, и тогда остальная часть потока не имеет смысла. Когда вы начнете слушать, данные будут извлечены и предоставлены кусками.


Широковещательные потоки


Другой тип потока предназначен для отдельных сообщений, которые могут обрабатываться по одному. Такой поток можно использовать, например, для событий мыши в браузере.


Вы можете начать слушать такой поток в любое время, и вы получите события, произошедшие во время прослушивания. Поток могу слушать несколько слушателей. Вы можете снова начать слушать события потока после отмены предыдущей подписки.


Методы, обрабатывающие поток


Следующие методы в Stream<Т> обрабатывают поток и возвращают результат:


Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

Все эти функции, кроме drain() и pipe(), соответствуют аналогичной функции в Iterable. Каждый из них может быть легко написан с помощью асинхронной функции с циклом await for (или просто с помощью одного из других методов). Например, некоторые реализации могут быть такими:
All of these functions, except drain() and pipe(), correspond to a similar function on Iterable. Each one can be written easily by using an async function with an await for loop (or just using one of the other methods). For example, some implementations could be:


Future<bool> contains(Object needle) async {
  await for (var event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (var event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await this.forEach(result.add);
  return result;
}

Future<String> join([String separator = ""]) async =>
    (await this.toList()).join(separator);

(Актуальная реализация немного сложнее, но в основном по историческим причинам.)


Методы, изменяющие поток


Следующие методы в Stream возвращают новый поток на основе исходного потока. Каждый из них ждет, пока кто-то прослушает новый поток, прежде чем слушать исходный.


Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<R> retype<R>();
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

Выше указанные методы соответствуют аналогичным методам в Iterable, которые преобразуют итерируемый объект в другой итерируемый объект. Все это можно легко записать с помощью асинхронной функции с циклом await for.


Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next) equals]);

Функции asyncExpand() и asyncMap() похожы на функции expand() и map(), но позволяют аргументу функции быть асинхронной функцией. distinct() функции не существует в Iterable, но ее можно реализовать.


Stream<T> handleError(Function onError, {bool test(error)});
Stream<T> timeout(Duration timeLimit,
    {void Function(EventSink<T> sink) onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

Последние три функции более спецэфичны. Они включают обработку ошибок, которую не может выполнить цикл await for, так как первая же ошибка завершит цикл и его подписку на поток. С этим ничего не поделать. Вы можете исползовать handleError() для удаления ошибок из потока перед его использованием в цикле await for.


Функция transform()


Функция transform() предназначена не только для обработки ошибок; она является более обобщенным "map" для потоков. Для нормального map требуется одно значение для каждого входящего события. Однако, особенно для потоков ввода-вывода, для создания выходного события может потребоваться несколько входящих событий. StreamTransformer может помочь с этим. Например, дешифраторы, как Utf8Decoder, являются трансформаторами. Трансформатор требует только одну функцию bind(), которая может быть легко реализована через асинхронную функцию.


Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (var event in streamWithoutErrors) {
    yield convert(event);
  }
}

Чтение и декодирование файла


Следующий код считывает файл и выполняет два преобразования в потоке. Сначала он преобразует данные из UTF8, а затем пропускает их через LineSplitter. Все строки печатаются, кроме тех, которые начинаются с хэштега (#).


import 'dart:convert';
import 'dart:io';

Future<void> main(List<String> args) async {
  var file = File(args[0]);
  var lines = file
      .openRead()
      .transform(utf8.decoder)
      .transform(LineSplitter());
  await for (var line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

Метод listen()


Метод listen() — это “низкоуровневый” метод, все остальные функции функция определяются через listen().


StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});

Чтобы создать новый тип потока, можно просто отнаследовать класс Stream и реализовать метод listen(), все другие методы Stream вызывают listen() для работы.


Метод listen() позволяет начать прослушивание потока. Пока вы этого не сделаете, поток является инертным объектом, описывающим, какие события вы хотите слушать. При прослушивании возвращается объект StreamSubscription, представляющий активный поток, создающий события. Это похоже на то, как Iterable — это просто коллекция объектов, а итератор — это тот, кто делает фактическую итерацию.


Подписку на поток можно остановить, возобновить ее после паузы и полностью отменить. Можно задать колбэки, которые будут вызываться для каждого события данных или события ошибки, а также при закрытии потока.




Что еще почитать?


Dart 2. Асинхронное программирование: futures

Комментарии (4)


  1. ReklatsMasters
    11.03.2019 12:50

    Я не программирую на Dart, а пока только интересуюсь. В связи с чем у меня вопрос, а зачем, собственно, придумывать новую сущность Future, когда можно было использовать уже давно работающий в js Promise API? Может, mraleph


    1. mraleph
      11.03.2019 13:19

      > а зачем, собственно, придумывать новую сущность Future, когда можно было использовать уже давно работающий в js Promise API

      Во-первых, Dart совершенно самостоятельный язык, зачем ему смотреть на то как в JS что-то сделано?

      А, во-вторых, посмотрите на даты релиза Dart и Promise API в JS. Внезапно обнаружится, что Dart появился где-то в 2011 (начал разрабатываться где-то в 2010м) — и с самого начала имел Future, и Promise API в JS еще даже дизайнить не начали в то время.


    1. dlc
      11.03.2019 13:20

      Потому как термин Future более ранний по отношению к термину Promise. К тому же, Dart в большей степени смотрит на Java, где Future, а не на JS, где Promise.


    1. sharpfellow Автор
      11.03.2019 13:57

      Вряд ли кто-то что-то придумывал в этом случае. Идеи Future и Promise давно не новы

      Немного истории
      image