我在 dart 中使用 StreamController。我希望能够停止监听 Controller 的流,然后再次开始监听。我不需要同时有多个听众。我只是要求我监听流,然后停止监听流,然后再建立一个新的监听器。
我创建了一个最小的示例,我尝试取消初始订阅,然后再次收听流,但仍然收到错误状态错误。
import 'dart:async';
void main(List<String> arguments) async {
var controller = StreamController<String>();
var sub = controller.stream.listen((event) { });
await sub.cancel();
controller.stream.listen((event) { }); // throws bad state
}
最佳答案
StreamController
类创建一个单订阅流(您只能收听一次)或一个广播流(如果使用 StreamController.broadcast
创建),它可以多次聆听。
对于您所描述的用途,您需要广播变体。
您可能希望避免在该 Controller 没有监听器时发送事件(即使没有监听器,广播流也可以发出事件,它们会被广播到空白中)。当取消最后一个监听器并添加第一个(新)监听器时,将调用广播流 Controller 的 onCancel
和 onListen
回调。
StreamController.broadcast
Controller 不会阻止您同时拥有多个监听器,但您只要小心就可以避免这种情况。
示例:
import 'dart:async';
void main(List<String> arguments) async {
var controller = StreamController<String>.broadcast();
controller.onListen = () {
print("Active");
};
controller.onCancel = () {
print("Inactive");
};
var sub = controller.stream.listen((event) { }); // "Active"
await sub.cancel(); // "Inactive"
controller.stream.listen((event) { }); // "Active"
}
如果您确实想坚持“一次一个监听器”,您可以将流包装在类似以下内容的内容中:
import "dart:async";
/// Allows a stream to be listened to multiple times.
///
/// Returns a new stream which has the same events as [source],
/// but which can be listened to more than once.
/// Only allows one listener at a time, but when a listener
/// cancels, another can start listening and take over the stream.
///
/// If the [source] is a broadcast stream, the listener on
/// the source is cancelled while there is no listener on the
/// returned stream.
/// If the [source] is not a broadcast stream, the subscription
/// on the source stream is maintained, but paused, while there
/// is no listener on the returned stream.
///
/// Only listens on the [source] stream when the returned stream
/// is listened to.
Stream<T> resubscribeStream<T>(Stream<T> source) {
MultiStreamController<T>? current;
StreamSubscription<T>? sourceSubscription;
bool isDone = false;
void add(T value) {
current!.addSync(value);
}
void addError(Object error, StackTrace stack) {
current!.addErrorSync(error, stack);
}
void close() {
isDone = true;
current!.close();
current = null;
sourceSubscription = null;
}
return Stream<T>.multi((controller) {
if (isDone) {
controller.close(); // Or throw StateError("Stream has ended");
return;
}
if (current != null) throw StateError("Has listener");
current = controller;
var subscription = sourceSubscription ??=
source.listen(add, onError: addError, onDone: close);
subscription.resume();
controller
..onPause = subscription.pause
..onResume = subscription.resume
..onCancel = () {
current = null;
if (source.isBroadcast) {
sourceSubscription = null;
return subscription.cancel();
}
subscription.pause();
return null;
};
});
}
关于dart - 我可以多次监听 StreamController 的流(但一次不能订阅多个)吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70558849/