订阅者可以在不关闭流的情况下处理在Dart流中引发的异常吗?

问题描述

我无法理解的简短示例:

Stream<int> getNumbersWithException() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
    if (i == 3) throw Exception();
  }
}

用法

getNumbersWithException()
    .handleError((x) => print('Exception caught for $x'))
    .listen((event) {
  print('Observed: $event');
});

这将在输出3处停止:

Observed: 0
Observed: 1
Observed: 2
Observed: 3
Exception caught for Exception: foo

根据文档(https://dart.dev/tutorials/language/streams和(https://api.dart.dev/stable/2.9.1/dart-async/Stream/handleError.html),这是预期的,因为抛出异常会自动关闭流。

  1. 这是否意味着在流中处理异常的正确方法(即订阅可以在这种事件中长期存在)是处理流中异常的正确方法?不可能从外面这样做吗?
  2. 广播流是否一样?
  3. 如果我以错误的方式思考这个问题,那么开始正确思考的一些指示是什么?

我目前正在将流视为异步数据事件的来源,这些事件有时可能是错误事件。从文档和示例来看,这一切看起来都很整洁,但是我认为要处理错误并继续观察数据流是正常的用例。我很难编写代码来做到这一点。但是,我可能正在解决这个错误。任何见解将不胜感激。


编辑:我可以补充一点,我已经尝试了各种类似使用流转换器的方法,但结果相同:

var transformer = StreamTransformer<int,dynamic>.fromHandlers(
  handleData: (data,sink) => sink.add(data),handleError: (error,stackTrace,sink) =>
      print('Exception caught for $error'),handleDone: (sink) => sink.close(),);
getNumbersWithException().transform(transformer).listen((data) {
  print('Observed: $data');
});

此外,listen()一个可选参数cancelOnError,看起来不错,但认为false,所以这里没有雪茄。

解决方法

生成器方法

Stream<int> getNumbersWithException() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
    if (i == 3) throw Exception();
  }
}

将在您引发异常时终止。 throw正常工作,它不会直接将异常添加到流中。因此,它通过循环和方法主体传播出去,直到整个方法主体以抛出的异常结束为止。 此时,未处理的异常将添加到流中,然后由于主体已结束而关闭流。

因此,问题不在于处理,而在于流的生成。 您确实必须在本地处理该错误,以避免它终止流生成主体。

您不能在throw方法中使用async*向流中添加多个错误,而错误将是流的最后一件事。

实际上会发出多个错误的有效手段是产生异常

  if (i == 3) yield* () async* { throw Exception(); }();
  // or:      yield* Stream.fromFuture(Future.error(Exception());

这将直接向生成的流中发出异常,而无需在本地抛出异常并结束生成器方法主体。