javascript - RxJS,如何轮询 API 以使用动态时间戳持续检查更新的记录

标签 javascript rxjs

我是 RxJS 的新手,我正在尝试编写一个应用程序来完成以下事情:

  1. 加载时,发出 AJAX 请求(为简单起见伪造为 fetchItems())以获取项目列表。
  2. 在此之后的每一秒,发出一个 AJAX 请求以获取项目。
  3. 检查新项目时,只有在最近时间戳之后更改的项目才应返回。
  4. 在可观察对象之外不应该有任何状态。

我的 first attempt非常直接,实现了目标 1、2 和 4。

var data$ = Rx.Observable.interval(1000)
  .startWith('run right away')
  .map(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`, but
    // I am not yet tracking the `modifiedSince` timestamp yet so all items will always be returned
    return fetchItems(); 
  });

现在我很兴奋,这很容易,实现目标 3 不会那么难......几个小时后,这是 where I am at :

var modifiedSince = null;
var data$ = Rx.Observable.interval(1000)
  .startWith('run right away')
  .flatMap(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
    return fetchItems(modifiedSince);
  })
  .do(function(item) {
    if(item.updatedAt > modifiedSince) {
      modifiedSince = item.updatedAt;
    }
  })
  .scan(function(previous, current) {
    previous.push(current);
    return previous;
  }, []);

这解决了目标 3,但在目标 4 上倒退了。我现在将状态存储在 observable 之外。

我假设全局 modifiedSince.do() block 不是实现此目的的最佳方式。任何指导将不胜感激。

编辑:希望通过这个问题澄清我在寻找什么。

最佳答案

这是另一种不使用闭包或“外部状态”的解决方案。

我做了以下假设:

  • fetchItems 返回项目的 Rx.Observable,即不是项目数组

它利用 expand 运算符,允许发出遵循 x_n+1 = f(x_n) 类型递归关系的值。您通过返回一个发出该值的可观察对象来传递 x_n+1,例如 Rx.Observable.return(x_n+1) 并且您可以通过返回 来完成递归>Rx.Observable.empty()。这里似乎没有结束条件,因此它将永远运行。

scan 还允许按照递归关系发出值 (x_n+1 = f(x_n, y_n))。不同之处在于 scan 强制您使用同步函数(因此 x_n+1y_n 同步),而 expand 您可以使用可观察对象形式的异步函数。

代码没有经过测试,所以不管它是否有效,请及时通知我。

相关文档:expand , combineLatest

var modifiedSinceInitValue = // put your date here
var polling_frequency = // put your value here
var initial_state = {modifiedSince: modifiedSinceInitValue, itemArray : []}
function max(property) {
  return function (acc, current) {
    acc = current[property] > acc ? current[property] : acc;
  }
}    
var data$ = Rx.Observable.return(initial_state)
  .expand (function(state){
             return fetchItem(state.modifiedSince)
                   .toArray()
                   .combineLatest(Rx.Observable.interval(polling_frequency).take(1), 
                     function (itemArray, _) {
                       return {
                         modifiedSince : itemArray.reduce(max('updatedAt'), modifiedSinceInitValue), 
                         itemArray : itemArray
                       }
                     }

  })

关于javascript - RxJS,如何轮询 API 以使用动态时间戳持续检查更新的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34545217/

相关文章:

javascript - 在第一行有 rowspan 的情况下,在表格的第一行设置 css 样式 - 多表

javascript - session flash 消息仅在单独刷新后显示,而不在 express 中显示

javascript - 如何更改 css 中的 td、tr 值?

angular - 扩展 Angular 2 ngModel 指令以使用可观察对象

javascript - RxJS 连接/合并异步请求超时

angular - 如何将 "monkey patch"设置为 Zone.js 的 Observable?

javascript - IE9上加载图片后的事件

javascript - 如何使用 onclick 将 php 图像更改为另一个 php 图像

javascript - Rx 如何在 web 上真正工作(客户端)

javascript - Angular 7 响应解析器无法正常工作