javascript - RXJS:我可以根据单个输出的条目顺序交错多个流吗?

标签 javascript rxjs

这纯粹是了解运算符的学术练习。如何使用 RXJS 交错传入流

例如从这里开始:

// RxJS v6+
import { of } from 'rxjs';
//emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
//output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));

对此:

import { of } from 'rxjs';

const source1 = of(1,3,5,7,9);
const source2 = of(2,4,6,8,10);

// a simple merge will just append source1 and source2
// how do I obtain an output = 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2).subscribe(val => console.log(val))

约束:这是一个一般性问题,因此我不想使用基于此特定示例的奇数/偶数分隔的谓词,而是纯粹基于传入元素的顺序创建输出,即从每个流中获取第一个,然后第二个,依此类推

这可能吗?

最佳答案

执行此操作的标准方法确实是 merge函数,它可以接收任意数量的可观察量作为参数,并按顺序发出它们的所有项目,无论函数参数中可观察量的顺序如何。但正如您所指出的,它附加了结果,其原因不是合并问题,而是实际上 of同步发出这些值,因此它们都在同一个 javascript 循环中按顺序运行。您可以通过使用异步发出值的 observable 来更改它,或者使用一些可用的异步创建一个 observable Schedulers

异步可观察:

我创建了这个 asyncOf 函数作为 Observable 的模型,其值随着时间的推移而发出,就像 websocket 客户端或用户交互一样。

import { merge, Observable, EMPTY } from 'rxjs';
import { toArray } from 'rxjs/operators';

const asyncOf = <T = any>(args: T[], interval = 500): Observable<T> => {
  let i = 0;
  let intervalRef;

  if(!args?.length) {
    return EMPTY;
  }
 
  return new Observable(observer => {
    intervalRef = setInterval(() => {
      if(args[i]) {
        observer.next(args[i])
      }
      ++i;

      if(!args[i]) {
        observer.complete()
        if(intervalRef) {
           clearInterval(intervalRef)
        }
      }
    }, interval)
  })
}

const source1 = asyncOf([1, 3, 5, 7, 9]);
const source2 = asyncOf([2, 4, 6, 8, 10]);

//output: 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2)
  //.pipe(toArray()) //collect all emmited values and emmit an array when the observable completes
  .subscribe((val) => console.log(val));

在这种情况下,合并工作正常,因为两个可观察对象都异步运行。

异步调度程序

另一种选择是使用一个异步调度程序(或创建您自己的)来描述每个项目应何时被发出。这更接近您的示例,并且更深入地了解 rxjs:

import { merge, scheduled, asyncScheduler } from 'rxjs';
import { toArray } from 'rxjs/operators';

const source1 = scheduled([1, 3, 5, 7, 9], asyncScheduler);
const source2 = scheduled([2, 4, 6, 8, 10], asyncScheduler);

//output: 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2)
  //.pipe(toArray()) //collect all emmited values and emmit an array when the observable completes
  .subscribe((val) => console.log(val));

Scheduled来自 RXJS 6.5+ 并弃用在其他函数中使用调度程序函数,例如 of

关于javascript - RXJS:我可以根据单个输出的条目顺序交错多个流吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72856926/

相关文章:

javascript - CSS:如何根据屏幕/窗口大小设置宽度和高度?

javascript - 使 .toggleClass() 在 jQuery 中单击时仅影响具有相同类的一个元素

javascript - rxjs 中的 Observable 和 Subject 有什么区别?

javascript - 为状态管理发布插件效果

javascript - 手动添加节点到 JavaScript InfoVis Toolkit Force Directed Graph

javascript - 无法在 Google map 脚本标记中使用回调

javascript - JavaScript 常量存储在浏览器中的哪里?

angular - 如何正确捕获@ngrx/effects 错误?

rxjs - BehaviorSubject 映射一个 BehaviorSubject

javascript - 是否有机会从 RxJs 'next' 中省略 'subscribe' 部分