Стримы в dart. Создание, подписки, обработка ошибок

От простого к сложному, для понимания потоков в dart. Создадим элементарный стрим через цикл for:


Stream myStream() async* {
  for (int i = 1; i < 10; i++) {
    yield i;
  }
}

Мы его можем прочитать (прослушать) через await for в методе myListen():


main() async {
  Stream stream = myStream();
  myListen(stream);
}

Stream myStream() async* {
  for (int i = 1; i < 10; i++) {
    yield i;
  }
}

myListen(stream) async {
  await for (var i in stream) {
    print(i); //1 2 3 4 5 6 7 8 9
  }
}

Также мы можем прослушать наш стрим через метод по умолчанию listen(), который есть в dart:


main() async {
  Stream stream = myStream();
  stream.listen((event) {
    print(event); //1 2 3 4 5 6 7 8 9
  });
}

Stream myStream() async* {
  for (int i = 1; i < 10; i++) {
    yield i;
  }
}

Создадим простейший стрим через цикл for и создадим метод summer(), который через await for переберет наш стрим и просуммирует всего его значения:


//тут async названии функции нужен только для того, чтоб можно было написать await
void main() async {
  Stream count = countStream(10);
  print(await summer(count)); //46
}

//создаем Stream через цикл for. Возвращаем значение через yield, поэтому и async*
Stream countStream(int to) async* {
  for (int i = 0; i < to; i++) {
    yield i;  
  }
}

//перебирать стрим через await for с ним можно только в асинхронной функции
Future summer(Stream stream) async{
  int sum = 1;
  await for (int value in stream) {
    sum += value;
  }
  return sum;
}

await for можно обернуть в try catch для обработки ошибок:


Future summer(Stream stream) async{
  int sum = 1;
  try {
    await for (int value in stream) {
      sum += value;
    }  
  } catch(e) {
    return -1;
  }
  
  return sum;
}

Создание стрима через класс Stream

Создадим стрим через класс Stream и его метод periodic. Назначим Duration равным 1 s.


void main() async {
  var stream = Stream.periodic(Duration(seconds: 1), (i) => i);
  stream.forEach((element) {
    print(element); //Бесконечно будет выводить 1 2 3 4 5 6 7 ... n
  });
}

Создадим стрим, в котором будем возвращать значения списка (список городов) с задержкой через Future.delayed. Через метод forEach нашего созданного стрима будет получать значение каждого элемента:


List fetchCityList() {
  return ['St. Petersburg', 'Bangkok', 'Beijing', 'Cairo', 'Delhi', 'Guangzhou', 'Jakarta', 'Kolkāta', 'Manila', 'Mexico City', 'Moscow', 'Mumbai', 'New York', 'São Paulo', 'Seoul', 'Shanghai', 'Tokyo'];
}

void main() async {
  var stream = loadCityStream();
  stream.forEach((element) {
    print(element);
  });
}

loadCityStream() async* {
  for (String name in fetchCityList()) {
    await Future.delayed(Duration(milliseconds: 400));
    yield name;
  }
}

Применяем цепочно методы map(), forech(), reduce(), then(), print() к стриму

В данном примере мы инициализируем 2 стрима.

В первом мы производим калькуляцию и печатаем каждый полученный элемент.

Во втором мы не печатаем каждую итерацию, а подсчитываем суммарное количество методом reduce(), а потом, после окончания стрима, через метод then(), печатаем итоговую сумму.

"Итерация" каждого стрима выполняется один за другим, именно поэтому итоговая сумма отображается(печатается) сразу после отображения последнего элемента (города)


List fetchCityList() {
  return ['St. Petersburg', 'Bangkok', 'Beijing', 'Cairo', 'Delhi', 'Guangzhou', 'Jakarta', 'Kolkāta', 'Manila', 'Mexico City', 'Moscow', 'Mumbai', 'New York', 'São Paulo', 'Seoul', 'Shanghai', 'Tokyo'];
}

int calculatePopulationOf(String city) {
  final populations = {'Tokyo': 37274000, 'Delhi': 32065760, 'Shanghai': 28516904, 'São Paulo': 22429800, 'Mexico City': 22085140, 'Cairo': 21750020, 'Beijing': 21333332, 'Mumbai': 20961472, 'Kolkāta': 15133888, 'Manila': 14406059, 'Guangzhou': 13964274, 'Moscow': 12640818, 'Jakarta': 11074811, 'Bangkok': 10899698, 'Seoul': 9975709, 'London': 9540576, 'New York': 8177025};
  return populations[city] ?? -1;
}

void main() async {
  print("CITY: POPULATION");    
  loadCityStream()
    .map((city) => "$city: " + calculatePopulationOf(city).toString())
    .forEach(print);

  loadCityStream()
    .map(calculatePopulationOf)
    .reduce((value, element) => value + element)
    .then((total) => print("Total known population: $total"));
}

Stream loadCityStream() async* {
  for (String name in fetchCityList()) {
    await Future.delayed(Duration(milliseconds: 800));
    yield name;
  }
}

Subscription and Broadcast

Мы вызывали стрим 2 раза, использую loasCityStream(). Давайте от этого попробуем избавиться. Изменим main():


void main() async {
  print("CITY: POPULATION");

  final stream = loadCityStream();
  stream
      .map((city) => "$city: " + calculatePopulationOf(city).toString())
      .forEach((n) {
    print("$n");
  });

  stream
      .map(calculatePopulationOf)
      .reduce((value, element) => value + element)
      .then((total) => print("Total known population: $total"));
}

И у нас получится ошибка: Uncaught Error: Bad state: Stream has already been listened to. Это получается из-за того, что наш стрим создается с одиночным подписчиком ( single subscription stream). Чтобы это изменить, добавим: final stream = loadCityStream().asBroadcastStream(); - Теперь наш стрим является broadcast. И ошибки уже не будет. А код в main() будет выглядеть так:


void main() async {
  print("CITY: POPULATION");

  final stream = loadCityStream().asBroadcastStream();
  stream
      .map((city) => "$city: " + calculatePopulationOf(city).toString())
      .forEach((n) {
    print("$n");
  });

  stream
      .map(calculatePopulationOf)
      .reduce((value, element) => value + element)
      .then((total) => print("Total known population: $total"));
}

Stream loadCityStream() async* {
  for (String name in fetchCityList()) {
    await Future.delayed(Duration(milliseconds: 800));
    yield name;
  }
}

subscription: pause, resume, cancel

Нашу подписку мы может поставить на паузу, продолжить или завершить. Обращаем внимание, что когда мы внедряем await Future.delayed и пока наш "код на паузе", то наш стрим всё равно идет. И только при указании subscription.pause(), мы ставим на паузу нашу подписку, а потом продолжаем subscription.resume() стрим с того места, с которого остановились


void main() async {
  final stream = loadCityStream();
  final subscription = stream.listen(
    (city) => print("loaded `$city`"),
    onDone: () => print("all cities loaded"),
  );
  print('delay 5 started');
  await Future.delayed(Duration(seconds: 5)); //мы эвейтим код, а стрим между тем идет все 5 секунд
  print('subscription pause');
  subscription.pause(); //мы ставим на паузу нашу подписку
  print('delay 5 started');
  await Future.delayed(Duration(seconds: 5)); //теперь у нас и код заэвейчен и стрим на паузе 
  print('subscription resume');
  subscription.resume();
  print('delay 5 started');
  await Future.delayed(Duration(seconds: 5));//снова идет стрим 5 секунд, т.к. подписка уже не на паузе
  print('subscription cancel');
  subscription.cancel(); //мы остановили нашу подписку, не дождавшись окончания стрима
}

Stream loadCityStream() async* {
  for (String name in fetchCityList()) {
    await Future.delayed(Duration(milliseconds: 1000));
    yield name;
  }
}

Обработка ошибок

В стримах (Stream) и во Future мы перехватываем ошибки с помощью catchError или через try/catch в await for.

Воспроизведем ошибку в стриме и обработаем её через catchError:


Stream failingStream() async* {
  throw Exception();
}

void main() async {
  failingStream()
    .forEach(print)
    .catchError((e) => print("Stream failed: $e")); //Stream failed: Exception
}

Обработаем ошибку через try/catch:


Stream failingStream() async* {
  throw Exception();
}

void main() async {
  try {
    await for (final city in failingStream()) {
      print(city);
    }
  } catch (e) {
    print("Caught error: $e"); // Caught error: Exception
  }
}

Источники:

https://dart.dev/tutorials/language/streams

https://www.kodeco.com/32851541-dart-futures-and-streams

Рубрика: dart

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *