dart - 我可以多次监听 StreamController 的流(但一次不能订阅多个)吗?

标签 dart

我在 dart 中使用 StreamController。我希望能够停止监听 Controller 的流,然后再次开始监听。我不需要同时有多个听众。我只是要求我监听流,然后停止监听流,然后再建立一个新的监听器。

我创建了一个最小的示例,我尝试取消初始订阅,然后再次收听流,但仍然收到错误状态错误。

import 'dart:async';

void main(List<String> arguments) async {
  var controller = StreamController<String>();
  var sub = controller.stream.listen((event) { });
  await sub.cancel();
  controller.stream.listen((event) { }); // throws bad state
}

最佳答案

StreamController 类创建一个单订阅流(您只能收听一次)或一个广播流(如果使用 StreamController.broadcast 创建),它可以多次聆听。

对于您所描述的用途,您需要广播变体。

您可能希望避免在该 Controller 没有监听器时发送事件(即使没有监听器,广播流也可以发出事件,它们会被广播到空白中)。当取消最后一个监听器并添加第一个(新)监听器时,将调用广播流 Controller 的 onCancelonListen 回调。

StreamController.broadcast Controller 不会阻止您同时拥有多个监听器,但您只要小心就可以避免这种情况。

示例:

import 'dart:async';

void main(List<String> arguments) async {
  var controller = StreamController<String>.broadcast();
  controller.onListen = () {
    print("Active");
  };
  controller.onCancel = () {
    print("Inactive");
  };
  var sub = controller.stream.listen((event) { }); // "Active"
  await sub.cancel(); // "Inactive"
  controller.stream.listen((event) { }); // "Active"
}

如果您确实想坚持“一次一个监听器”,您可以将流包装在类似以下内容的内容中:

import "dart:async";

/// Allows a stream to be listened to multiple times.
///
/// Returns a new stream which has the same events as [source],
/// but which can be listened to more than once.
/// Only allows one listener at a time, but when a listener
/// cancels, another can start listening and take over the stream.
///
/// If the [source] is a broadcast stream, the listener on
/// the source is cancelled while there is no listener on the
/// returned stream.
/// If the [source] is not a broadcast stream, the subscription
/// on the source stream is maintained, but paused, while there
/// is no listener on the returned stream.
///
/// Only listens on the [source] stream when the returned stream
/// is listened to.
Stream<T> resubscribeStream<T>(Stream<T> source) {
  MultiStreamController<T>? current;
  StreamSubscription<T>? sourceSubscription;
  bool isDone = false;
  void add(T value) {
    current!.addSync(value);
  }

  void addError(Object error, StackTrace stack) {
    current!.addErrorSync(error, stack);
  }

  void close() {
    isDone = true;
    current!.close();
    current = null;
    sourceSubscription = null;
  }

  return Stream<T>.multi((controller) {
    if (isDone) {
      controller.close(); // Or throw StateError("Stream has ended");
      return;
    }
    if (current != null) throw StateError("Has listener");
    current = controller;
    var subscription = sourceSubscription ??=
        source.listen(add, onError: addError, onDone: close);
    subscription.resume();
    controller
      ..onPause = subscription.pause
      ..onResume = subscription.resume
      ..onCancel = () {
        current = null;
        if (source.isBroadcast) {
          sourceSubscription = null;
          return subscription.cancel();
        }
        subscription.pause();
        return null;
      };
  });
}

关于dart - 我可以多次监听 StreamController 的流(但一次不能订阅多个)吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70558849/

相关文章:

dart - 使用 custom_element_apigen : does not working 到 Dart 的纸质数据表端口

animation - Flutter -Loader 动画中的缩放过渡

exit - 我怎样才能退出我的 Dart (vm) 程序?

json - 在 URL 参数中传递用户名和密码(Dart 和 Flutter)

dart - 为什么集合不包含点

dart - 如何使 FadeInImage 变成圆形?

android-studio - 在flutter上对PopupMenuButton返回值方法使用setState方法

json - Flutter JSON 重复索引

firebase - flutter :如何删除gridview单元格的背景?

firebase - 在Flutter中从Firebase FireCloud接收数据时,不会执行StreamBuilder中的快照代码