javascript - RxJs 将流拆分为多个流

标签 javascript rxjs system.reactive

如何根据分组方法将一个永无止境的流拆分为多个结束流?

--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>

进入这些观察

--a--a-a-a-a-|
             b---b-b--b-|
                        c-c---c-c-|
                                  d-d-d-|
                                        e...>

如你所见,a在开头,我收到b后,就不再收到a了,所以应该结束。这就是为什么普通的 groupBy 不好。

最佳答案

您可以使用 windowshare 源 Observable。 bufferCount(2, 1) 还有一个小技巧:

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

source
    .bufferCount(2, 1) // delay emission by one item
    .map(arr => arr[0])
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

这会打印(因为 toArray()):

[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]

此解决方案的问题在于订阅 source 的顺序。我们需要 window 通知器在第一个 bufferCount 之前订阅。否则,首先将一个项目推得更远,然后使用 .filter(([oldValue, newValue]) ...) 检查它是否与前一个不同。

这意味着需要在 window 之前延迟发射(这是第一个 .bufferCount(2, 1).map(arr => arr[0]).

或者使用 publish() 自己控制订阅的顺序可能更容易:

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

const connectable = source.publish();

connectable
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

connectable.connect();

输出是一样的。

关于javascript - RxJs 将流拆分为多个流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46836738/

相关文章:

javascript - 如何在 JavaScript 中将 addEventListener 指定为 ON?

javascript - 为什么我的 scrapy 下载器中间件无法正确渲染 javascript?

javascript - Math.ceil 到位置 1 最接近的五

javascript - 从纯 javascript 使用 salesforce chatter rest 服务

angular - refCount off 的 rxjs shareReplay 在第一个下游订阅者之前不会订阅源

angular - 对于相互依赖数据的请求,可观察量是否是正确的解决方案?

rxjs - 如何获取 RxJs 中的BehaviorSubject 中哪些对象发生变化?

c# - 当任何值由另一个 Observable 产生时,选择 Observable 的最新值

c# - 可观察到 Rx 中的回调

c# - 递归和 Rx 并行性