我正在学习 RxJS。我对下面的代码片段有疑问。
var arr = [1,2,3,4,5];
var arraysource = Observable.from(arr);
arr.push(6);
var subscription = arraysource.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));
arr.push(7);
当我运行上面的代码时,我得到以下输出。
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onCompleted
我的问题是为什么订阅后添加的第七个元素没有发布?是因为输入流是冷流并且它同步读取项目吗?那么 onComplete 火灾后添加的项目永远不会到达观察者?有人可以解释一下这种行为吗?
最佳答案
您可以创建自己的 Observable 来获取您正在寻找的功能
Rx 5 Beta(将 Rx 4 的 next
和 complete
更改为 onNext
onCompleted
)
var source = Rx.Observable.create(function (observer) {
[1,2,3,4,5].forEach(item => observer.next(item));
observer.next(6);
// observer.complete() // <-- remove comment to allow observable to complete
// Any cleanup logic might go here
return function () {
console.log('disposed');
};
});
var subscription = source.subscribe(
function (x) { console.log('onNext: %s', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); }
);
示例http://jsbin.com/datuqoniyo/edit?js,console
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create
关于javascript - RxJS 将新项目添加到数组流未发布给订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37735721/