javascript - RxJS:如何将多个嵌套的可观察对象与缓冲区结合起来

标签 javascript rxjs observable rename rxjs5

警告:这里是 RxJS 新手。

这是我的挑战:

  1. onUnlink$ observable 发出...
  2. 立即开始从 onAdd$ observable 中捕获值,最多持续 1 秒(我将此分区称为 onAddBuffer$)。
  3. 查询数据库(创建一个 doc$ observable)以获取我们将用于匹配 onAdd$ 值之一的模型
  4. 如果 onAddBuffer$ 可观察值中的一个值与 doc$ 值匹配,则不发射
  5. 如果 onAddBuffer$ 可观察值中没有任何值与 doc$ 值匹配,或者如果 onAddBuffer$ 可观察值从不发出,则发出doc$

这是我最好的猜测:

// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap( unlinkValue => {

  const doc$ = Rx.Observable.fromPromise( db.File.findOne({ unlinkValue }) )

  const onAddBuffer$ = onAdd$
    .buffer( doc$ ) // capture events while fetching from db -- not sure about this
    .takeUntil( Rx.Observable.timer(1000) );

  // if there is a match, emit nothing. otherwise wait 1 second and emit doc
  return doc$.switchMap( doc =>
    Rx.Observable.race( 
      onAddBuffer$.single( added => doc.attr === added.attr ).mapTo( Rx.Observable.empty() ),
      Rx.Observable.timer( 1000 ).mapTo( doc )
    )
  );
});

docsToRemove$.subscribe( doc => {
  // should only ever be invoked (with doc -- the doc$ value) 1 second
  // after `onUnlink$` emits, when there are no matching `onAdd$`
  // values within that 1 second window.
})

这总是发出 EmptyObservable。也许是因为 single 似乎在没有匹配时发出 undefined ,而我期望它在没有匹配时根本不会发出? find 也会发生同样的事情。

如果我将 single 更改为 filter,则不会发出任何内容。

仅供引用:这是一个包含文件系统事件的重命名场景——如果 add 事件在 unlink 事件发生后的 1 秒内发生并且发出的文件哈希匹配,则执行没什么,因为它是一个重命名。否则它是一个真正的 unlink 并且它应该发出要删除的数据库文档。

最佳答案

这是我的猜测,你可以如何做到这一点:

onUnlink$.concatMap(unlinkValue => {
  const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();
  const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);
  const onAddBuffer$ = onAdd$.buffer(bufferDuration$);

  return Observable.forkJoin(onAddBuffer$, doc$)
    .map(([buffer, docResponse]) => { /* whatever logic you need here */ });
});

single() 运算符有点棘手,因为它仅在源 Observable 完成 后才发出与谓词函数匹配的项(或在出现错误时发出错误)是两个项目或没有匹配的项目)。

race() 也很棘手。如果其中一个源 Observables 完成并且没有发出任何值 race() 将完成并且不会发出任何东西。我前段时间报告过这个,这是正确的行为,请参阅 https://github.com/ReactiveX/rxjs/issues/2641 .
我想这就是您的代码中的错误所在。

另请注意,.mapTo(Rx.Observable.empty()) 会将每个值映射到 Observable 的实例中。如果您想忽略所有值,您可以使用 filter(() => false)ignoreElements() 运算符。

关于javascript - RxJS:如何将多个嵌套的可观察对象与缓冲区结合起来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47870632/

相关文章:

javascript - div 切换无法按我想要的方式工作

javascript - 是否可以在友好的 iframe 中注册 Service Worker?

node.js - 从 NodeJS Express 'post' 可观察到的 ReactiveX

javascript - 如何使用 Observable 编写函数?

javascript - Jquery多加减脚本

javascript - 根据以下数据创建自定义对象

properties - 检测复合属性的变化

Angular 6 防护可观察悬挂

javascript - RxJS:auditTime 和 sampleTime 之间的区别?

angular - 组件中的 rxjs 行为主题设置值