javascript - 如何从无限的 RxJs 流中获取不是初始值的单个最新值?

标签 javascript angular redux rxjs

概念

这是一个模拟的 angular2 项目。

当使用来自 redux 存储的可观察流时,我尝试先过滤,然后获取/takeLast/last 最新值。 之后,我想在流完成时解决 promise ,但在使用 takeLast 运算符时却没有。

所以问题是:我可以使用什么运算符设置来从流中获取最新值

设置

我将我的 Angular 2 设置简化为 RxJs 使用的要点。

  • source observable 由 redux 库管理,未完成
  • 服务正在提供一些逻辑来从流中检索最新值
  • 组件是消费值(value) promise 风格

这是一个工作示例:https://fiddle.jshell.net/markus_falk/an41z6g9/

redux 存储模拟:

var latestTime$ = new Rx.Subject();
setInterval(function(){
     latestTime$.onNext(Date.now()); 
}, 2000);

服务可注入(inject)模拟:

var timeStore = null;
var getLatestTime = function() {

  return new Promise((resolve, reject) => {

     latestTime$

     /* 
        filter out 'null' for when the button is clicked
        before the store updates the first time
      */
     .filter(function(x) {
        console.log('filter: ', x);
        return x === typeof('number');
     })

     // try to end to stream by taking the last from the stream ?!?!?!?
     .takeLast(1)

     // handle promise
     .subscribe(

       function (x) {
         console.log('Next: ' + x);
         // store latest stream value
         timeStore = x;
       },
       function (err) {
         console.log('Error: ' + err);
         reject(err)
       },
       function () {
         console.log('Completed');
         // pass on latest value of endless when stream completes 
         resolve(timeStore);
       }

    );

  });

};

还有一个消费模拟组件:

document.querySelector("#foo").addEventListener("click", function(event) {

  var time = getLatestTime();

  time.then((latestTime) => {
    console.log('latestTime: ', latestTime);
  });

  time.catch((err) => {
    console.log('oh oh: ', err);
  });

}, false);

最佳答案

这应该模拟您的情况。

查看现场演示:https://jsfiddle.net/usualcarrot/zh07hfrc/1/

var subject = new Rx.Subject();

subject.skip(1).last().subscribe(function(val) {
  console.log('next:', val);
}, function(val) {
  console.log('error:', val);
}, function() {
  console.log('completed');
});

subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
subject.onCompleted();

这会打印到控制台:

next: 5
completed

而不是 console.log('completed'); 你会放 resolve(...)。也许这甚至不是必需的,您可以只返回 Subject 并订阅它(?),具体取决于您的用例。在那种情况下使用 asObservable() to hide the fact you're using a Subject .参见 similar use-case with asObservable() .

关于javascript - 如何从无限的 RxJs 流中获取不是初始值的单个最新值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39764370/

相关文章:

javascript - 如何在一个 Action 完成后在React js中调度一个 Action ?

reactjs - React/Redux 不更新状态

javascript - 如何远程模拟下拉选择

angular - 类型 'Observable' 缺少类型 'User' 的以下属性

angular - 当 Font Awesome 带宽使用量超过我本月计划允许的 npm 带宽时,会发生什么?

angular - 远程 Angular 应用程序和 Electron 桌面应用程序之间的通信

javascript - removeChild 有时会删除整个跨度,有时不会

javascript - 如何在 onload 事件之前运行 JavaScript 片段(Google Chrome 扩展)

javascript - 为什么我的标题文本被 chop 了?

javascript - ScrollView minHeight 占满所有空间