stream - 如何在 Dart 中使用 StreamChannel 进行双向通信?

标签 stream dart

Dart Streams 有没有办法做类似的事情await stream.first连续多次使用流有点像堆栈/队列?

据我了解,您可以通过在现有的 Single-substription 流上使用 asBroadcastStream 来做到这一点,但我觉得这并不理想。

也许有一种带有一些 Rx 包的缓冲区原语或类似的东西?

我的用例如下:

我有一个 StreamChannel ( IOWebSocketChannel ) 我想按特定顺序发送/接收消息。

IE:

Send Message0
Receive Message1
Send Message2
Receive Message3

我很确定Message1只有在服务器收到 Message0 后才会到达
(等等 Message2Message3 )

最佳答案

Dart 有一个优点(在我看来,缺点)是对基于推送的事件和基于拉取的资源读取具有单一抽象。这导致了许多其他困惑,例如 "Do I have to cancel stream subscriptions" .

在基于推送的模型中,说:

abstract class Element {
  Stream<MouseEvent> get onClick;
}

当点击发生时,类(class)会通知您。可能有 0 和实际上无限的点击事件之间的任何地方,并且不希望以与读取资源相同的方式缓冲或处理它们(尤其是考虑到 Dart 是单线程的)。另一个注意事项:这个流有任意数量的订阅者是完全有效的(多个类可能有兴趣知道点击何时发生)。

另一方面,有一个基于拉的模型,例如读取文件:
abstract class File {
  Stream<String> readLines();
}

在这种情况下,您可能希望逐行处理,甚至可能在您点击某个字符串后停止,并且您肯定希望收到 EOF 的通知(通常通过“完成”事件或流关闭,以惯用语 Dart )。另一个注意事项:拥有超过 1 个订阅者是无效的 - 这会很快变得棘手。

对于您的特定问题,您似乎想要:
  • 发送您自己的事件
  • 按需接收下一个事件
  • 处理事件,发送自己的事件
  • 等等...

  • 好的,让我们来看看你的问题的细节:

    As I understand it, you could do that by using asBroadcastStream on an existing Single-substription stream, but I feel like this isn't ideal.



    非常不理想。单订阅流(我在脑海中称这些“资源流”,就像上面的 readLines 调用)会自动缓冲事件并等待订阅者。没有收到一行文本真的很糟糕,因为订阅是在读取文件之后发生的。

    另一方面,广播流不缓冲事件。因此,如果您根据时间问题等采用您的方法,您可能会发现自己丢失了发送的事件。

    这里有几个选项。没有一个是完美的,但它们可能会有所帮助:
  • package:stream_transform 包有一组常用的转换 Stream ,包括一些受 RX 启发的。
  • package:async 包还有其他用于处理异步代码的实用程序。具体在这里,您可能会找到 StreamQueue 完全符合您的要求:
    Future<void> processEvents(Stream<String> inputStream) async {
      var queue = new StreamQueue(inputStream);
      while (await queue.hasNext) {
        var next = await queue.next;
        // Insert processing here.
      }
    }
    

  • 我发现自己希望 StreamQueue曾在 dart:async ,并且 Stream 之间没有共享接口(interface)s 用于资源和用于事件的流,但今天这是一种不错的方法。

    干杯!

    关于stream - 如何在 Dart 中使用 StreamChannel 进行双向通信?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50006277/

    相关文章:

    file - createReadStream 发送文件http

    java - Jasper 和 .xlsx 的内容消息不可读

    delphi - DataSnap XE2 和 TStream 方法参数

    android - 在 flutter 中具有拖动效果的滚动条

    FlutterError(未安装dispose(): (lifecycle state: defunct,之后调用的setState()

    dart - 如何在 CupertinoTabView 中使用 Flutter Textfield 而不会在点击时引发异常?

    dart - 如何在 onTap 中使用条件语句

    file-io - fdopen() 会导致内存泄漏吗?

    import - Dart库导入快捷方式

    dart - future 返回空列表