我正在创建pipeable operators使用 RxJS 6,并且不清楚当操作异步时如何complete()
观察者。
对于同步操作,逻辑很简单。在下面的示例中,源 Observable
中的所有值都将传递给 observer.next()
,然后是 observer.complete()
被调用。
const syncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => observer.next(x),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
但是,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 setTimeout()
的调用来表示。显然,在任何值传递给observer.next()之前,都会调用observer.complete()。
const asyncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => setTimeout(() => observer.next(x), 100),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
所以问题是:惯用的 RxJS 方法是什么,以便仅在所有值异步传递给 observer.next 后才调用
?我应该手动跟踪待处理的调用还是有更“ react 性”的解决方案?observer.complete()
()
(请注意,上面的示例是我的实际代码的简化,并且对 setTimeout()
的调用旨在表示“任何异步操作”。我正在寻找处理可管道运算符中的异步操作的通用方法,而不是关于如何处理 RxJS 中的延迟或超时的建议。)
最佳答案
一种思路可能是重构您的 asyncOp
以使用其他运算符,例如 mergeMap
。
这是使用此方法重现示例的代码
const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));
这是否值得考虑取决于您的 asyncOp
的作用。如果它是异步的,因为它依赖于一些回调,比如 https 调用或从文件系统读取,那么我认为这种方法可以工作,因为你可以将基于回调的函数转换为 Observable。
关于javascript - 防止 RxJS 中异步可管道运算符过早完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51516606/