javascript - rxjs 在处理数据之前检查流是否为空

标签 javascript rxjs

我们有以下流。

const recorders = imongo.listCollections('recorders')
    .flatMapConcat(names => {
        const recorders = names
            .map(entry => entry.name)
            .filter(entry => !_.contains(
                ['recorders.starts',
                 'recorders.sources',
                 'system.indexes',
                 'system.users'],
                entry));
        console.log(recorders);
        return Rx.Observable.fromArray(recorders);
    });

recorders.isEmpty()
    .subscribe(
        empty => {
            if(empty) {
                logger.warn('No recorders found.');
            }
        },
        () => {}
    );

recorders.flatMapConcat(createRecorderIntervals)
    .finally(() => process.exit(0))
    .subscribe(
        () => {},
        e => logger.error('Error while updating: %s', e, {}),
        () => logger.info('Finished syncing all recorders')
    );

如果流为空,那么我们不想createRecorderIntervals。上面的代码正在运行。但是,检查流是否为空会导致 console.log 被执行两次。为什么会发生这种情况?我能以某种方式修复它吗?

编辑:所以,在重新考虑了@Martin的回答后,我采取了以下方式

const recorders = imongo.listCollections('recorders')
    .flatMapConcat(names => {
        const recorders = names
            .map(entry => entry.name)
            .filter(entry => !_.contains(
                ['recorders.starts',
                 'recorders.sources',
                 'system.indexes',
                 'system.users'],
                entry));

        if(!recorders.length) {
            logger.warn('No recorders found.');
            return Rx.Observable.empty();
        }

        return Rx.Observable.fromArray(recorders);
    })
    .flatMapConcat(createRecorderIntervals)
    .finally(() => scheduleNextRun())
    .subscribe(
        () => {},
        e => logger.error('Error while updating: %s', e, {}),
        () => logger.info('Finished syncing all recorders')
    );

最佳答案

当您在 Observable 上调用 subscribe() 方法时,它会创建整个运算符链,然后调用 imongo.listCollections('recorders ') 在你的情况下两次。

您可以在调用 flatMapConcat(createRecorderIntervals) 之前插入一个运算符来检查结果是否为空。我特别考虑其中一个,但可能还有其他更适合您的需求:

  • takeWhile() - 将谓词作为参数,并在返回 false 时发出 onComplete

那么你的代码将如下所示:

const recorders = imongo.listCollections('recorders')
    .flatMapConcat(names => {
        ...
        return Rx.Observable.fromArray(recorders);
    })
    .takeWhile(function(result) {
        // condition
    })
    .flatMapConcat(createRecorderIntervals)
        .finally(() => process.exit(0))
        .subscribe(...);

我不知道你的代码到底是做什么的,但我希望你明白。

编辑:如果您想在整个 Observable 为空时收到通知,有多种方法:

  • do()运算符和自定义Observer目的。您将编写一个自定义观察者,并在 .flatMapConcat(createRecorderIntervals) 之前使用 do() 运算符将其放置。该对象将计算其 next 回调被调用的次数,并且当前面的 Observable 完成时,您可以判断是否至少有一个结果或根本没有结果。

  • 创建 ConnectableObservable 。这可能与我们一开始所做的最相似。您将使用 publish() 运算符将您的 recorders 转换为 ConnectableObservable。然后就可以订阅多个Observer,而不需要触发算子链。当您订阅了所有观察者后,您可以调用 connect() ,它将按顺序向所有观察者发送值:

    var published = recorders.publish();
    
    published.subscribe(createObserver('SourceA'));
    published.subscribe(createObserver('SourceB'));
    
    // Connect the source
    var connection = published.connect();
    

    在您的情况下,您将创建两个 Subjects (因为它们同时充当 Observable 和 Observer),并将其中一个与 isEmpty() 链接起来,第二个是 flatMapConcat()。有关更多信息,请参阅文档:http://reactivex.io/documentation/operators/connect.html

我认为第一个选项实际上对您来说更容易。

关于javascript - rxjs 在处理数据之前检查流是否为空,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39698287/

相关文章:

javascript - Backbone 和 Express 路由器可以在 Express 应用程序中协同工作吗?

javascript - 如何在所有文档中使用 Firebase?

angular - do/tap 运算符和其他可观察运算符有什么区别?

javascript - 单击同一行中的按钮后减少一个值 - knockout

javascript - Angular Material 2 md-tab内容高度

javascript - 如何跟踪 Reactjs 中功能组件的状态?

angular - rxjs:定期执行一些 Action ,中间有特定的延迟

angular - .subscribe(x=> ...) 和 .subscribe(next(...)) 的区别?

angular - Angular 在哪里为 *ngIf 定义 "as local-var"行为?

javascript - api 调用在一种方法中工作但在另一种方法中不起作用 ---TypeError : Cannot read property 'getResponse' of undefined