Стримы в dart. Создание, подписки, обработка ошибок
От простого к сложному, для понимания потоков в dart. Создадим элементарный стрим через цикл for:
Stream myStream() async* {
for (int i = 1; i < 10; i++) {
yield i;
}
}
Мы его можем прочитать (прослушать) через await for в методе myListen():
main() async {
Stream stream = myStream();
myListen(stream);
}
Stream myStream() async* {
for (int i = 1; i < 10; i++) {
yield i;
}
}
myListen(stream) async {
await for (var i in stream) {
print(i); //1 2 3 4 5 6 7 8 9
}
}
Также мы можем прослушать наш стрим через метод по умолчанию listen(), который есть в dart:
main() async {
Stream stream = myStream();
stream.listen((event) {
print(event); //1 2 3 4 5 6 7 8 9
});
}
Stream myStream() async* {
for (int i = 1; i < 10; i++) {
yield i;
}
}
Создадим простейший стрим через цикл for и создадим метод summer(), который через await for переберет наш стрим и просуммирует всего его значения:
//тут async названии функции нужен только для того, чтоб можно было написать await
void main() async {
Stream count = countStream(10);
print(await summer(count)); //46
}
//создаем Stream через цикл for. Возвращаем значение через yield, поэтому и async*
Stream countStream(int to) async* {
for (int i = 0; i < to; i++) {
yield i;
}
}
//перебирать стрим через await for с ним можно только в асинхронной функции
Future summer(Stream stream) async{
int sum = 1;
await for (int value in stream) {
sum += value;
}
return sum;
}
await for можно обернуть в try catch для обработки ошибок:
Future summer(Stream stream) async{
int sum = 1;
try {
await for (int value in stream) {
sum += value;
}
} catch(e) {
return -1;
}
return sum;
}
Создание стрима через класс Stream
Создадим стрим через класс Stream и его метод periodic. Назначим Duration равным 1 s.
void main() async {
var stream = Stream.periodic(Duration(seconds: 1), (i) => i);
stream.forEach((element) {
print(element); //Бесконечно будет выводить 1 2 3 4 5 6 7 ... n
});
}
Создадим стрим, в котором будем возвращать значения списка (список городов) с задержкой через Future.delayed. Через метод forEach нашего созданного стрима будет получать значение каждого элемента:
List fetchCityList() {
return ['St. Petersburg', 'Bangkok', 'Beijing', 'Cairo', 'Delhi', 'Guangzhou', 'Jakarta', 'Kolkāta', 'Manila', 'Mexico City', 'Moscow', 'Mumbai', 'New York', 'São Paulo', 'Seoul', 'Shanghai', 'Tokyo'];
}
void main() async {
var stream = loadCityStream();
stream.forEach((element) {
print(element);
});
}
loadCityStream() async* {
for (String name in fetchCityList()) {
await Future.delayed(Duration(milliseconds: 400));
yield name;
}
}
Применяем цепочно методы map(), forech(), reduce(), then(), print() к стриму
В данном примере мы инициализируем 2 стрима.
В первом мы производим калькуляцию и печатаем каждый полученный элемент.
Во втором мы не печатаем каждую итерацию, а подсчитываем суммарное количество методом reduce(), а потом, после окончания стрима, через метод then(), печатаем итоговую сумму.
"Итерация" каждого стрима выполняется один за другим, именно поэтому итоговая сумма отображается(печатается) сразу после отображения последнего элемента (города)
List fetchCityList() {
return ['St. Petersburg', 'Bangkok', 'Beijing', 'Cairo', 'Delhi', 'Guangzhou', 'Jakarta', 'Kolkāta', 'Manila', 'Mexico City', 'Moscow', 'Mumbai', 'New York', 'São Paulo', 'Seoul', 'Shanghai', 'Tokyo'];
}
int calculatePopulationOf(String city) {
final populations = {'Tokyo': 37274000, 'Delhi': 32065760, 'Shanghai': 28516904, 'São Paulo': 22429800, 'Mexico City': 22085140, 'Cairo': 21750020, 'Beijing': 21333332, 'Mumbai': 20961472, 'Kolkāta': 15133888, 'Manila': 14406059, 'Guangzhou': 13964274, 'Moscow': 12640818, 'Jakarta': 11074811, 'Bangkok': 10899698, 'Seoul': 9975709, 'London': 9540576, 'New York': 8177025};
return populations[city] ?? -1;
}
void main() async {
print("CITY: POPULATION");
loadCityStream()
.map((city) => "$city: " + calculatePopulationOf(city).toString())
.forEach(print);
loadCityStream()
.map(calculatePopulationOf)
.reduce((value, element) => value + element)
.then((total) => print("Total known population: $total"));
}
Stream loadCityStream() async* {
for (String name in fetchCityList()) {
await Future.delayed(Duration(milliseconds: 800));
yield name;
}
}
Subscription and Broadcast
Мы вызывали стрим 2 раза, использую loasCityStream(). Давайте от этого попробуем избавиться. Изменим main():
void main() async {
print("CITY: POPULATION");
final stream = loadCityStream();
stream
.map((city) => "$city: " + calculatePopulationOf(city).toString())
.forEach((n) {
print("$n");
});
stream
.map(calculatePopulationOf)
.reduce((value, element) => value + element)
.then((total) => print("Total known population: $total"));
}
И у нас получится ошибка: Uncaught Error: Bad state: Stream has already been listened to. Это получается из-за того, что наш стрим создается с одиночным подписчиком ( single subscription stream). Чтобы это изменить, добавим: final stream = loadCityStream().asBroadcastStream(); - Теперь наш стрим является broadcast. И ошибки уже не будет. А код в main() будет выглядеть так:
void main() async {
print("CITY: POPULATION");
final stream = loadCityStream().asBroadcastStream();
stream
.map((city) => "$city: " + calculatePopulationOf(city).toString())
.forEach((n) {
print("$n");
});
stream
.map(calculatePopulationOf)
.reduce((value, element) => value + element)
.then((total) => print("Total known population: $total"));
}
Stream loadCityStream() async* {
for (String name in fetchCityList()) {
await Future.delayed(Duration(milliseconds: 800));
yield name;
}
}
subscription: pause, resume, cancel
Нашу подписку мы может поставить на паузу, продолжить или завершить. Обращаем внимание, что когда мы внедряем await Future.delayed и пока наш "код на паузе", то наш стрим всё равно идет. И только при указании subscription.pause(), мы ставим на паузу нашу подписку, а потом продолжаем subscription.resume() стрим с того места, с которого остановились
void main() async {
final stream = loadCityStream();
final subscription = stream.listen(
(city) => print("loaded `$city`"),
onDone: () => print("all cities loaded"),
);
print('delay 5 started');
await Future.delayed(Duration(seconds: 5)); //мы эвейтим код, а стрим между тем идет все 5 секунд
print('subscription pause');
subscription.pause(); //мы ставим на паузу нашу подписку
print('delay 5 started');
await Future.delayed(Duration(seconds: 5)); //теперь у нас и код заэвейчен и стрим на паузе
print('subscription resume');
subscription.resume();
print('delay 5 started');
await Future.delayed(Duration(seconds: 5));//снова идет стрим 5 секунд, т.к. подписка уже не на паузе
print('subscription cancel');
subscription.cancel(); //мы остановили нашу подписку, не дождавшись окончания стрима
}
Stream loadCityStream() async* {
for (String name in fetchCityList()) {
await Future.delayed(Duration(milliseconds: 1000));
yield name;
}
}
Обработка ошибок
В стримах (Stream) и во Future мы перехватываем ошибки с помощью catchError или через try/catch в await for.
Воспроизведем ошибку в стриме и обработаем её через catchError:
Stream failingStream() async* {
throw Exception();
}
void main() async {
failingStream()
.forEach(print)
.catchError((e) => print("Stream failed: $e")); //Stream failed: Exception
}
Обработаем ошибку через try/catch:
Stream failingStream() async* {
throw Exception();
}
void main() async {
try {
await for (final city in failingStream()) {
print(city);
}
} catch (e) {
print("Caught error: $e"); // Caught error: Exception
}
}