Javascript 可观察量 : need the functionality of a switchMap but with a slight difference

标签 javascript rxjs observable

我有一个 Rxjs 可观察对象(下面代码中的 stream),它发出可观察对象(subjOnesubjTwo)。每个内部可观察对象都可以随时以任何顺序发出自己的值。我的任务是从 subjOne 捕获值,直到 subjTwo 发出它的第一个值。

const subjOne = new Subject();
const subjTwo = new Subject();

const stream = Observable.create(observer => {
    observer.next(subjOne);
    observer.next(subjTwo);
});

stream
    .someOperator(subj => subj)
    .subscribe(value => console.log('Value: ', value));

示例 1: subjOne 发出值 1 和 2,然后 subjTwo 发出值 3,然后 subjOne 发出值 4。 输出应为:1、2、3。

示例 2: subjTwo 发出 1,然后 subjOne 发出 2。 输出应该是 1。

switchMap 不适合这里,因为一旦 stream 发出 subjTwo,它就会从 subjOne 中删除值。关于如何实现这一目标的任何想法?谢谢。

更新:在我的实际案例中,不仅有两个内部可观察对象——subjOnesubjTwo——而且它们是一个恒定的流,因此手动硬编码 subjOne.takeUntil(subjTwo) 不是一个可行的选择。

最佳答案

我认为这可以满足您的需求:

// scan to let us keep combining the previous observable
// with the next observable
source
  .scan((current, next) => {
    // takeUntil to stop current when next produces
    const currentUntil = current.takeUntil(next);
    // now merge current with next
    return currentUntil.merge(next)
  }, Rx.Observable.empty())
  // switch to the most recent version of the combined inner observables
  .switch();

请注意,这只有在内部可观察对象时才能正常工作。如果它们是冷可观察对象,则需要更多代码才能实现。

关于Javascript 可观察量 : need the functionality of a switchMap but with a slight difference,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43808774/

相关文章:

javascript - Twilio .sendDigits() 不适用于 WebRTC 客户端出站调用

javascript - RxJS 管道中的 Observables 是否有更简单、更优雅的解决方案?

typescript - Angular 2 : Catching 401 error for token refresh

angular - 如何在 Angular 和 Jasmine 中对取消订阅进行单元测试?

swift - 加载大量项目时的尾递归

javascript - 如何从 WordPress 中的 ajax 处理程序调用我主题的 functions.php 中的方法?

javascript - 如果单击某些选择选项,则在表单上输入自定义值

javascript - 无法解构 'data' 的属性 '(intermediate value)',因为它未定义

javascript - 将 Observable 的输出从对象转换为数组

angular - RxJS:Share() 一个 Observable 并向每个新订阅者发出最后一个值