Dart Streams 有没有办法做类似的事情await stream.first
连续多次使用流有点像堆栈/队列?
据我了解,您可以通过在现有的 Single-substription 流上使用 asBroadcastStream 来做到这一点,但我觉得这并不理想。
也许有一种带有一些 Rx 包的缓冲区原语或类似的东西?
我的用例如下:
我有一个 StreamChannel
( IOWebSocketChannel
) 我想按特定顺序发送/接收消息。
IE:
Send Message0
Receive Message1
Send Message2
Receive Message3
我很确定
Message1
只有在服务器收到 Message0
后才会到达(等等
Message2
和 Message3
)
最佳答案
Dart 有一个优点(在我看来,缺点)是对基于推送的事件和基于拉取的资源读取具有单一抽象。这导致了许多其他困惑,例如 "Do I have to cancel stream subscriptions" .
在基于推送的模型中,说:
abstract class Element {
Stream<MouseEvent> get onClick;
}
当点击发生时,类(class)会通知您。可能有 0 和实际上无限的点击事件之间的任何地方,并且不希望以与读取资源相同的方式缓冲或处理它们(尤其是考虑到 Dart 是单线程的)。另一个注意事项:这个流有任意数量的订阅者是完全有效的(多个类可能有兴趣知道点击何时发生)。
另一方面,有一个基于拉的模型,例如读取文件:
abstract class File {
Stream<String> readLines();
}
在这种情况下,您可能希望逐行处理,甚至可能在您点击某个字符串后停止,并且您肯定希望收到 EOF 的通知(通常通过“完成”事件或流关闭,以惯用语 Dart )。另一个注意事项:拥有超过 1 个订阅者是无效的 - 这会很快变得棘手。
对于您的特定问题,您似乎想要:
好的,让我们来看看你的问题的细节:
As I understand it, you could do that by using asBroadcastStream on an existing Single-substription stream, but I feel like this isn't ideal.
非常不理想。单订阅流(我在脑海中称这些“资源流”,就像上面的
readLines
调用)会自动缓冲事件并等待订阅者。没有收到一行文本真的很糟糕,因为订阅是在读取文件之后发生的。另一方面,广播流不缓冲事件。因此,如果您根据时间问题等采用您的方法,您可能会发现自己丢失了发送的事件。
这里有几个选项。没有一个是完美的,但它们可能会有所帮助:
package:stream_transform
包有一组常用的转换 Stream
,包括一些受 RX 启发的。 package:async
包还有其他用于处理异步代码的实用程序。具体在这里,您可能会找到 StreamQueue
完全符合您的要求:Future<void> processEvents(Stream<String> inputStream) async {
var queue = new StreamQueue(inputStream);
while (await queue.hasNext) {
var next = await queue.next;
// Insert processing here.
}
}
我发现自己希望
StreamQueue
曾在 dart:async
,并且 Stream
之间没有共享接口(interface)s 用于资源和用于事件的流,但今天这是一种不错的方法。干杯!
关于stream - 如何在 Dart 中使用 StreamChannel 进行双向通信?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50006277/