arrays - rxjs 从两个来源追上订阅

标签 arrays node.js functional-programming rxjs

我需要将追赶和订阅新提要结合起来。因此,首先我在数据库中查询我错过的所有新记录,然后切换到发布订阅以获取所有传入的新记录。

第一部分很容易进行查询,也许批量为 500 个,这将为您提供一个数组,您可以 rx.observeFrom 。

第二部分很简单,你只需在 pubsub 上放置一个 rx.observe 即可。

但我需要做的是按顺序进行,因此我需要在开始播放新唱片之前播放所有旧唱片。

我想我可以开始订阅 pubsub,将它们放入一个数组中,然后开始处理旧的,当我完成后,要么删除重复项(或者因为我做了重复项检查),要么允许少数重复项,但播放累积的记录,直到它们消失,然后一进一出。

我的问题是最好的方法是什么?我应该创建一个订阅来开始在数组中构建新记录,然后开始处理旧记录,然后在旧记录过程的“然后”中订阅另一个数组吗?

好的,这就是我到目前为止所拥有的。我需要构建测试并完成一些伪代码以查明它是否有效,更不用说是一个好的实现了。在我埋葬自己之前,请随意阻止我。

var catchUpSubscription = function catchUpSubscription(startFrom) {
    EventEmitter.call(this);
    var subscription = this.getCurrentEventsSubscription();
    // calling map to start subscription and catch in an array.  
    // not sure if this is right
    var events = rx.Observable.fromEvent(subscription, 'event').map(x=> x);
    // getPastEvents gets batches of 500 iterates over and emits each 
   // till no more are returned, then resolves a promise
    this.getPastEvents({count:500, start:startFrom})
    .then(function(){
        rx.Observable.fromArray(events).forEach(x=> emit('event', x));
    });
};

我不知道这是最好的方法。有什么想法吗?

谢谢

最佳答案

我会避免不必要地混合不同的异步策略。您可以使用 concat 将两个序列连接在一起:

var catchUpSubscription = function catchUpSubscription(startFrom) {
    var subscription = this.getCurrentEventsSubscription();

    return Rx.Observable.fromPromise(this.getPastEvents({count:500, start:startFrom}))
     .flatMap(x => x)
     .concat(Rx.Observable.fromEvent(subscription, 'event'));

};

///Sometime later
catchUpSubscription(startTime).subscribe(x => /*handle event*/)

关于arrays - rxjs 从两个来源追上订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32232969/

相关文章:

functional-programming - 查找输入中某个标记值(例如 -999)之前出现的非负数的平均值

javascript - Highcharts 生成不正确

jquery - 在每个循环中使用 jQuery 属性从选择器开始

angularjs - 通过注入(inject)控制单元测试 Angular 误差

authentication - 如何向 Scotty 中间件添加基本身份验证?

javascript - 如何在没有尾调用优化的情况下用函数式编程替代方法替换 while 循环?

c++ - c++中表达式必须有常量值错误

java - 二分查找compareTo字符串对象

node.js - 在 express 中,当我调用next(err)时,下一个订单号/中间件不会被调用/处理

javascript - 随叫随到执行 MySQL 操作 - Node.js