javascript - 防止 RxJS 中异步可管道运算符过早完成

标签 javascript asynchronous rxjs rxjs-pipeable-operators

我正在创建pipeable operators使用 RxJS 6,并且不清楚当操作异步时如何complete()观察者。

对于同步操作,逻辑很简单。在下面的示例中,源 Observable 中的所有值都将传递给 observer.next(),然后是 observer.complete()被调用。

const syncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => observer.next(x),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

但是,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 setTimeout() 的调用来表示。显然,在任何值传递给observer.next()之前,都会调用observer.complete()。

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => setTimeout(() => observer.next(x), 100),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

所以问题是:惯用的 RxJS 方法是什么,以便仅在所有值异步传递给 observer.next 后才调用 observer.complete() ()?我应该手动跟踪待处理的调用还是有更“ react 性”的解决方案?

(请注意,上面的示例是我的实际代码的简化,并且对 setTimeout() 的调用旨在表示“任何异步操作”。我正在寻找处理可管道运算符中的异步操作的通用方法,而不是关于如何处理 RxJS 中的延迟或超时的建议。)

最佳答案

一种思路可能是重构您的 asyncOp 以使用其他运算符,例如 mergeMap

这是使用此方法重现示例的代码

const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));

这是否值得考虑取决于您的 asyncOp 的作用。如果它是异步的,因为它依赖于一些回调,比如 https 调用或从文件系统读取,那么我认为这种方法可以工作,因为你可以将基于回调的函数转换为 Observable。

关于javascript - 防止 RxJS 中异步可管道运算符过早完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51516606/

相关文章:

javascript - RXJS 通过属性 "id"组合/合并两个 Observable 数组

javascript - 关于 Promise 异步 JavaScript 的问题

JavaScript:了解图像何时完全加载

node.js - 从大型 MongoDB 集合动态生成 Mocha 测试

javascript - $(document).ready() 是在主线程中执行,还是异步?

node.js - 动态异步并行任务

angular - 如何使用 RxJs 处理/排队多个相同的 http 请求?

javascript - 如何制作图像轮播控件 'next'和 'previous'

javascript - jQueryUI - Accordion - 使用 ID 设置事件

typescript - RxJS:沿可观察链传递数据的简洁方法?