如何根据分组方法将一个永无止境的流拆分为多个结束流?
--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>
进入这些观察
--a--a-a-a-a-|
b---b-b--b-|
c-c---c-c-|
d-d-d-|
e...>
如你所见,a
在开头,我收到b
后,就不再收到a
了,所以应该结束。这就是为什么普通的 groupBy
不好。
最佳答案
您可以使用 window
和 share
源 Observable。 bufferCount(2, 1)
还有一个小技巧:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
source
.bufferCount(2, 1) // delay emission by one item
.map(arr => arr[0])
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
这会打印(因为 toArray()
):
[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]
此解决方案的问题在于订阅 source
的顺序。我们需要 window
通知器在第一个 bufferCount
之前订阅。否则,首先将一个项目推得更远,然后使用 .filter(([oldValue, newValue]) ...)
检查它是否与前一个不同。
这意味着需要在 window
之前延迟发射(这是第一个 .bufferCount(2, 1).map(arr => arr[0])
.
或者使用 publish()
自己控制订阅的顺序可能更容易:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
const connectable = source.publish();
connectable
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
connectable.connect();
输出是一样的。
关于javascript - RxJs 将流拆分为多个流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46836738/