我有一个 Rxjs 可观察对象(下面代码中的 stream
),它发出可观察对象(subjOne
和 subjTwo
)。每个内部可观察对象都可以随时以任何顺序发出自己的值。我的任务是从 subjOne 捕获值,直到 subjTwo 发出它的第一个值。
const subjOne = new Subject();
const subjTwo = new Subject();
const stream = Observable.create(observer => {
observer.next(subjOne);
observer.next(subjTwo);
});
stream
.someOperator(subj => subj)
.subscribe(value => console.log('Value: ', value));
示例 1:
subjOne
发出值 1 和 2,然后 subjTwo
发出值 3,然后 subjOne
发出值 4。
输出应为:1、2、3。
示例 2:
subjTwo
发出 1,然后 subjOne
发出 2。
输出应该是 1。
switchMap 不适合这里,因为一旦 stream
发出 subjTwo
,它就会从 subjOne
中删除值。关于如何实现这一目标的任何想法?谢谢。
更新:在我的实际案例中,不仅有两个内部可观察对象——subjOne
和 subjTwo
——而且它们是一个恒定的流,因此手动硬编码 subjOne.takeUntil(subjTwo)
不是一个可行的选择。
最佳答案
我认为这可以满足您的需求:
// scan to let us keep combining the previous observable
// with the next observable
source
.scan((current, next) => {
// takeUntil to stop current when next produces
const currentUntil = current.takeUntil(next);
// now merge current with next
return currentUntil.merge(next)
}, Rx.Observable.empty())
// switch to the most recent version of the combined inner observables
.switch();
请注意,这只有在内部可观察对象热时才能正常工作。如果它们是冷可观察对象,则需要更多代码才能实现。
关于Javascript 可观察量 : need the functionality of a switchMap but with a slight difference,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43808774/