我们有以下流。
const recorders = imongo.listCollections('recorders')
.flatMapConcat(names => {
const recorders = names
.map(entry => entry.name)
.filter(entry => !_.contains(
['recorders.starts',
'recorders.sources',
'system.indexes',
'system.users'],
entry));
console.log(recorders);
return Rx.Observable.fromArray(recorders);
});
recorders.isEmpty()
.subscribe(
empty => {
if(empty) {
logger.warn('No recorders found.');
}
},
() => {}
);
recorders.flatMapConcat(createRecorderIntervals)
.finally(() => process.exit(0))
.subscribe(
() => {},
e => logger.error('Error while updating: %s', e, {}),
() => logger.info('Finished syncing all recorders')
);
如果流为空,那么我们不想createRecorderIntervals
。上面的代码正在运行。但是,检查流是否为空会导致 console.log 被执行两次。为什么会发生这种情况?我能以某种方式修复它吗?
编辑:所以,在重新考虑了@Martin的回答后,我采取了以下方式
const recorders = imongo.listCollections('recorders')
.flatMapConcat(names => {
const recorders = names
.map(entry => entry.name)
.filter(entry => !_.contains(
['recorders.starts',
'recorders.sources',
'system.indexes',
'system.users'],
entry));
if(!recorders.length) {
logger.warn('No recorders found.');
return Rx.Observable.empty();
}
return Rx.Observable.fromArray(recorders);
})
.flatMapConcat(createRecorderIntervals)
.finally(() => scheduleNextRun())
.subscribe(
() => {},
e => logger.error('Error while updating: %s', e, {}),
() => logger.info('Finished syncing all recorders')
);
最佳答案
当您在 Observable
上调用 subscribe()
方法时,它会创建整个运算符链,然后调用 imongo.listCollections('recorders ')
在你的情况下两次。
您可以在调用 flatMapConcat(createRecorderIntervals)
之前插入一个运算符来检查结果是否为空。我特别考虑其中一个,但可能还有其他更适合您的需求:
-
takeWhile()
- 将谓词作为参数,并在返回false
时发出onComplete
。
那么你的代码将如下所示:
const recorders = imongo.listCollections('recorders')
.flatMapConcat(names => {
...
return Rx.Observable.fromArray(recorders);
})
.takeWhile(function(result) {
// condition
})
.flatMapConcat(createRecorderIntervals)
.finally(() => process.exit(0))
.subscribe(...);
我不知道你的代码到底是做什么的,但我希望你明白。
编辑:如果您想在整个 Observable 为空时收到通知,有多种方法:
do()
运算符和自定义Observer目的。您将编写一个自定义观察者,并在.flatMapConcat(createRecorderIntervals)
之前使用do()
运算符将其放置。该对象将计算其next
回调被调用的次数,并且当前面的 Observable 完成时,您可以判断是否至少有一个结果或根本没有结果。创建
ConnectableObservable
。这可能与我们一开始所做的最相似。您将使用publish()
运算符将您的recorders
转换为ConnectableObservable
。然后就可以订阅多个Observer,而不需要触发算子链。当您订阅了所有观察者后,您可以调用connect()
,它将按顺序向所有观察者发送值:var published = recorders.publish(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); // Connect the source var connection = published.connect();
在您的情况下,您将创建两个
Subjects
(因为它们同时充当 Observable 和 Observer),并将其中一个与isEmpty()
链接起来,第二个是flatMapConcat()
。有关更多信息,请参阅文档:http://reactivex.io/documentation/operators/connect.html
我认为第一个选项实际上对您来说更容易。
关于javascript - rxjs 在处理数据之前检查流是否为空,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39698287/