这个RxJava buffer example (带有marble chart!)完美地描述了所需的结果:
collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator
编辑:已审查How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval? ,我的问题似乎与使用主题而不是直接可观察有关。
使用套接字流生成窗口关闭事件(如下)会导致打开 2 个套接字并且没有事件流出:
ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver);
var closer = ws.flatMapFirst(Rx.Observable.timer(250));
ws.buffer(closer)
.subscribe(function(e) { console.log(e, 'socket messages');});
最佳答案
在此总结调查结果问题:
-
Rx.DOM.fromWebSocket
返回Rx.subject
它环绕着 websocket。该主题由一个观察者和一个可观察者组成(通过new Rx.Subject(observer, observable)
。根据我的理解,该观察者允许通过其onNext
方法写入套接字,而可观察者允许从套接字读取。 - 您总是读到主题是热源,但显然这里这只意味着观察者将立即将其值推送到主题,这里将其推送到套接字。正常情况下(
new Rx.Subject()
),默认的观察者和可观察者是这样的,即可观察者监听观察者,因此默认的可观察者是热的。然而,在这里,可观察的是冷源,然后任何订阅都将重新执行创建另一个 websocket 的回调。因此创建了两个套接字。 - 例如
Rx.dom.fromEvent
就不会发生这种情况因为创建的(冷)可观察量是共享的(通过publish().refCount()
)。 - 因此,通过在这里执行相同的操作,可以解决重复问题。这意味着在这种特殊情况下,请在代码中使用
ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share();
,share
是publish().refCount()
的别名. - 我想知道
Rx.DOM.fromWebSocket
的行为是否如此应该被报告为错误
两种方法的代码:
关于javascript - 如何使用 fromWebSocket 缓冲流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33495390/