javascript - 如何启动和停止 RXJS 中的可观察间隔?

标签 javascript rxjs

我有一个非常简单的 timeInterval 可观察对象,我想在不断开订阅者的情况下开始/停止传输(无论可观察状态如何,订阅者都应该坐下来等待)。有可能吗?如果可能的话怎么办?

var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .take(10);

  var subscription = source.subscribe(
  function (x) {
     $("#result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#result").append('Error: ' + err);
  },
  function () {
    $("#result").append('Completed');
  });

一般评论:我看到的大多数示例都展示了如何定义可观察对象和订阅者。我如何影响现有对象的行为?

最佳答案

取决于停止/恢复信号的来源。我能想到的最简单的方法是使用 pausable operator ,正如文档所说,它更适合热可观察对象。所以在下面的示例代码中,我删除了 take(10)(您的暂停信号现在来自 pauser 主题),并添加了 share将您的 Observable 变成一个热点。

var pauser = new Rx.Subject();
var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .share()
  .pausable(pauser);

var subscription = source.subscribe(
  function (x) {
     $("#result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#result").append('Error: ' + err);
  },
  function () {
    $("#result").append('Completed');
});

  // To begin the flow
pauser.onNext(true); // or source.resume();

// To pause the flow at any point
pauser.onNext(false);  // or source.pause();

这是一个more sophisticated example这将每 10 个项目暂停您的来源:

// Helper functions
function emits ( who, who_ ) {return function ( x ) {
 who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

var pauser = new Rx.Subject();
var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .share();
var pausableSource = source
  .pausable(pauser);

source
  .scan(function (acc, _){return acc+1}, 0)
  .map(function(counter){return !!(parseInt(counter/10) % 2)})
  .do(emits(ta_validation, 'scan'))
  .subscribe(pauser);

var subscription = pausableSource.subscribe(
  function (x) {
     $("#ta_result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#ta_result").append('Error: ' + err);
  },
  function () {
    $("#ta_result").append('Completed');
});

您现在应该已经回答了第二个问题。将给定的可观察对象与相关的 RxJS 运算符相结合,以实现您的用例。这就是我在这里所做的。

关于javascript - 如何启动和停止 RXJS 中的可观察间隔?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34996725/

相关文章:

javascript - 订阅主题完成后退订

rxjs - Observable 最终没有被解雇

javascript - 有条件激活的 rxjs 可观察速率限制器

javascript - 检测 DOM 元素是否确实具有 IE8 中定义的属性

javascript - 如何使用 Azure 函数和 HTTP 触发器 POST 更新 Cosmos Azure (NOSQL) 中的数据

javascript - 如何从 selected(different id, same name) datalist (html5) 选项中获取 data-id 属性的值?

javascript - React-Native 日历 : Pass date into custom function when a day is pressed

javascript - 结合两个可观察对象,其中一个是可选的

RXJs 服务返回多个 observables

javascript - 在 Rails 中的 .js.erb 中使用 ruby​​ 编码时, Assets 管道无法返回 @variables