javascript - 如何使用 fromWebSocket 缓冲流

标签 javascript reactive-programming rxjs

这个RxJava buffer example (带有marble chart!)完美地描述了所需的结果:

collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator

编辑:已审查How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval? ,我的问题似乎与使用主题而不是直接可观察有关。

使用套接字流生成窗口关闭事件(如下)会导致打开 2 个套接字并且没有事件流出:

ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver);
var closer = ws.flatMapFirst(Rx.Observable.timer(250));
ws.buffer(closer)
    .subscribe(function(e) { console.log(e, 'socket messages');});

最佳答案

在此总结调查结果问题:

  • Rx.DOM.fromWebSocket返回 Rx.subject它环绕着 websocket。该主题由一个观察者和一个可观察者组成(通过 new Rx.Subject(observer, observable) 。根据我的理解,该观察者允许通过其 onNext 方法写入套接字,而可观察者允许从套接字读取。
  • 您总是读到主题是热源,但显然这里这只意味着观察者将立即将其值推送到主题,这里将其推送到套接字。正常情况下(new Rx.Subject()),默认的观察者和可观察者是这样的,即可观察者监听观察者,因此默认的可观察者是热的。然而,在这里,可观察的是冷源,然后任何订阅都将重新执行创建另一个 websocket 的回调。因此创建了两个套接字。
  • 例如 Rx.dom.fromEvent 就不会发生这种情况因为创建的(冷)可观察量是共享的(通过 publish().refCount() )。
  • 因此,通过在这里执行相同的操作,可以解决重复问题。这意味着在这种特殊情况下,请在代码中使用 ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share(); , sharepublish().refCount() 的别名.
  • 我想知道 Rx.DOM.fromWebSocket 的行为是否如此应该被报告为错误

两种方法的代码:

关于javascript - 如何使用 fromWebSocket 缓冲流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33495390/

相关文章:

javascript - 管道操作符 RXJS Angular 5 的顺序

Angular 4 - 处理迭代中嵌套订阅的最佳方法是什么

javascript - 如何组合关联数组中的重复项?

javascript - 使用保存键按属性值对 Javascript 对象数组进行排序

java - RxJava : Can a result be split into two different observables?

javascript - shouldComponentUpdate 等效于功能组件,以忽略状态更改

javascript - 在 vuex 操作中使用 fetch 方法

javascript - 如何设置 useEffect 在第一次使用 eslint-react-hooks 渲染时从 API 获取数据?

c# - 有条件地组合两个 Rx 流

javascript - 为什么我不能将 .map() 函数链接到可观察对象上