javascript - RxJS 根据外部条件从流中提取数据

标签 javascript rxjs rxjs5

上下文:我有一个应用程序的日志库,我想使用 Rx subject 将日志写入数据库。 问题:日志是在建立与数据库的连接之前生成的,并且该连接可能随时变得不可用,但我可以观察到它要么包含具有使用数据库的方法的对象,要么包含 undefined object 。我想要做的是在数据库持久性未定义时将日志保留在流中,并在可用时恢复。

我认为代码应该如下所示:

logsSubject
           .takeWhen(/* test for database persistence */) // made up name because I don't know a Rx method that does what I want
           .subscribe(/* write data to database */);

但是我不确定这是否真的可能,因为我使用 RxJS 的时间很短

最佳答案

以下代码用于模拟您的场景。数据库每 10 秒连接一次,连接后每 5 秒断开一次。在实际用例中,连接和断开连接应该都是共享的 Observable 或Subject。我在这里使用合并而不是 takeUntil 运算符,因为 takeUntil 将刚刚完成可观察的序列,并且您将无法恢复它。 (repeatWhen 运算符应该可以工作,但在 Rxjs 5 中似乎缺失)

//emit value every 1s
const logWrite = Rx.Observable.interval(1000)

const dbConnected = new Rx.Subject()
// make it fire every 10s 
Rx.Observable.interval(10000).do(()=>dbConnected.next('db 
connected')).subscribe()

const dbDisconnected = Rx.Observable.timer(3000)
.flatMap(()=>Rx.Observable.throw('db disconnected'))


const writeWhenConnected = logWrite.merge(dbDisconnected)
.retryWhen(function(errors) {
    return errors.switchMap(()=>dbConnected);
});

writeWhenConnected.subscribe(val => console.log(val),console.warn);

关于javascript - RxJS 根据外部条件从流中提取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46315326/

相关文章:

javascript - Node 模块@kenjiuno/msgreader错误: MsgReader is not a constructor的原因

javascript - 从函数返回 Observable

javascript - Vue 中的 RxJS 入门

AJAX 请求的 RxJS 限制

javascript - 在 RxJS 中是否可以在 auditTime() 操作中测试未决值?

javascript - 如何在 Angular 中等待守卫

rxjs - 为什么订阅者会听到主题上的旧错误?

javascript - 在开发模式下阻止本地主机上的 Google 广告

javascript - 如何自定义JavaScript提示?

javascript - 使用 JavaScript 仅为在父级中单击的子级更改类