typescript - RXJS 控制可观察调用

标签 typescript angular rxjs rxjs5

我使用 RxJs version 5在我的 Angular 2 项目中。 我想创建一些可观察对象,但我不希望立即调用这些可观察对象。

version 4您可以使用(例如)Controlled 来控制调用命令或 Pausable Buffers . 但是该功能在版本 5 中不可用(yet)。

我怎样才能在 RxJs 5 中获得这种功能?

我的最终目标是将创建的可观察对象放入队列并一个一个地调用它们。只有当前一个处理成功时才会调用下一个。 当一个失败时,队列被清空。

编辑

根据@Niklas Fasching 的评论,我可以使用 Publish 创建一个可行的解决方案操作。

JS Bin

// Queue to queue operations
const queue = [];

// Just a function to create Observers
function createObserver(id): Observer {
    return {
        next: function (x) {
            console.log('Next: ' + id + x);
        },
        error: function (err) {
            console.log('Error: ' + err);
        },
        complete: function () {
            console.log('Completed');
        }
    };
};

// Creates an async operation and add it to the queue
function createOperation(name: string): Observable {

  console.log('add ' + name);
  // Create an async operation
  var observable = Rx.Observable.create(observer => {
    // Some async operation
    setTimeout(() => 
               observer.next(' Done'), 
               500);
  });
  // Hold the operation
  var published = observable.publish();
  // Add Global subscribe
  published.subscribe(createObserver('Global'));
  // Add it to the queue
  queue.push(published);
  // Return the published so the caller could add a subscribe
  return published;
};

// Create 4 operations on hold
createOperation('SourceA').subscribe(createObserver('SourceA'));
createOperation('SourceB').subscribe(createObserver('SourceB'));
createOperation('SourceC').subscribe(createObserver('SourceC'));
createOperation('SourceD').subscribe(createObserver('SourceD'));

// Dequeue and run the first
queue.shift().connect();

最佳答案

使用 Rx4 的 controlled Observable 在订阅时仍然会被调用

RxJS 4 中的 controlled 运算符实际上只是控制 Observable after 运算符的流。到那时,它全部通过并缓冲该运算符(operator)。考虑一下:

(RxJS 4) http://jsbin.com/yaqabe/1/edit?html,js,console

const source = Rx.Observable.range(0, 5).do(x => console.log('do' + x)).controlled();

source.subscribe(x => console.log(x));

setTimeout(() => {
  console.log('requesting');
  source.request(2);
}, 1000);

您会注意到 Observable.range(0, 5) 中的所有五个值都由 do 立即发出...然后在获得两个值之前暂停一秒(1000 毫秒)。

所以,这真的是背压控制的错觉。最后,该运算符中有一个无界缓冲区。一个数组,收集它“上方”的 Observable 向下发送的所有内容,并等待您通过调用 request(n) 将其出队。


RxJS 5.0.0-beta.2 复制控制

在回答这个问题时,RxJS 5 中不存在 controlled 运算符。这是出于以下几个原因:1. 没有请求,以及 2. 它的名称显然令人困惑(因此这个关于 StackOverflow 的问题)

如何复制 RxJS 5 中的行为(目前):http://jsbin.com/metuyab/1/edit?html,js,console

// A subject we'll use to zip with the source
const controller = new Rx.Subject();

// A request function to next values into the subject
function request(count) {
  for (let i = 0; i < count; i++) {
    controller.next(count);
  }
}

// We'll zip our source with the subject, we don't care about what
// comes out of the Subject, so we'll drop that.
const source = Rx.Observable.range(0, 5).zip(controller, (x, _) => x);

// Same effect as above Rx 4 example
source.subscribe(x => console.log(x));

// Same effect as above Rx 4 example
request(3);

背压控制

目前,对于“真正的背压控制”,一种解决方案是 promise 迭代器。尽管 IoP 并非没有问题,一方面,每个回合都有一个对象分配。每个值都有一个与之关联的 Promise。另一方面,不存在取消,因为它是 promise 。

一个更好的、基于 Rx 的方法是让一个 Subject “提供”您的可观察链的顶部,而您在其余部分进行组合。

是这样的:http://jsbin.com/qeqaxo/2/edit?js,console

// start with 5 values
const controller = new Rx.BehaviorSubject(5);

// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)

const controlled = controller.flatMap(
      // map your count into a set of values
      (count) => source.take(count), 
      // additional mapping for metadata about when the block is done
      (count, value, _, index) => {
        return { value: value, done: count - index === 1 }; 
      })
      // when the block is done, request 5 more.
      .do(({done}) => done && controller.next(5))
      // we only care about the value for output
      .map(({value}) => value);


// start our subscription
controlled.subscribe(x => {
  console.log(x)
});

...在不久的将来,我们也有一些关于具有真正背压控制的可流动可观察类型的计划。对于这种情况,这将更加令人兴奋和更好。

关于typescript - RXJS 控制可观察调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35343183/

相关文章:

javascript - HttpInterceptor 根据其他 observable 的值更改响应主体

TypeScript:typeof T 与 { new(): T } 不兼容

javascript - @Output childEvent 未初始化

javascript - 在 ionic 2 和 angular 2 中添加传单弹出窗口的链接

angular - Ionic 4 自定义组件

javascript - RxJ 和 switchMap

javascript - 显示活跃用户图标 - 如何确定谁不再在线(寻找 NG 或 JS 解决方案)

angular - redux-observable 不会在 catchError 之后执行

jquery - 转向 Typescript,数字错误

typescript - 从方法装饰器获取方法签名