javascript - 具有原始顺序的 RxJS mergeMap()

标签 javascript rxjs rxjs6 rxjs-pipeable-operators

抽象问题

有没有什么方法可以按照外部可观察对象的原始顺序使用 mergeMap 的结果,同时仍然允许内部可观察对象并行运行?


更详细的解释

让我们看一下两个合并映射运算符:

  • mergeMap

    ...它需要一个映射回调,以及可以并发运行的内部可观察量的数量:

      of(1, 2, 3, 4, 5, 6).pipe(
          mergeMap(number => api.get('/double', { number }), 3)
      );
    

    在此处查看实际操作:https://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010

    这将分别为 123 触发 3 个并行请求。一旦其中一个请求完成,它就会触发另一个对 4 的请求。依此类推,始终保持 3 个并发请求,直到处理完所有值。

    但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能是乱序的。所以不是:

      [2, 4, 6, 8, 10, 12]
    

    ...我们实际上可能得​​到:

      [4, 2, 8, 10, 6, 12] // or any other permutation
    
  • concatMap

    ...输入concatMap。此运算符确保可观察对象全部按原始顺序连接,以便:

      of(1, 2, 3, 4, 5, 6).pipe(
          concatMap(number => api.get('/double', { number }))
      );
    

    ...将始终产生:

      [2, 4, 6, 8, 10, 12]
    

    在此处查看实际操作:https://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010

    这就是我们想要的,但现在请求不会并行运行。作为the documentation说:

    concatMap is equivalent to mergeMap with concurrency parameter set to 1.

回到问题:是否有可能获得 mergeMap 的好处,从而可以并行运行给定数量的请求,同时仍然具有映射值以原始顺序发出?


我的具体问题

以上是对问题的抽象描述。当您知道手头的实际问题时,有时更容易推理问题,所以这里是:

  1. 我有一份必须发货的订单 list :

     const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
  2. 我有一个实际运送订单的 shipOrder 方法。它返回一个 Promise:

     const shipOrder = orderNumber => api.shipOrder(orderNumber);
    
  3. API 最多只能同时处理 5 个订单发货,所以我使用 mergeMap 来处理:

     from(orderNumbers).pipe(
         mergeMap(orderNumber => shipOrder(orderNumber), 5)
     );
    
  4. 订单发货后,我们需要打印其发货标签。我有一个 printShippingLabel 函数,根据已发货订单的订单号,该函数将打印其发货标签。所以我订阅了我们的 observable,并在输入值时打印运输标签:

     from(orderNumbers)
         .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
         .pipe(orderNumber => printShippingLabel(orderNumber));
    
  5. 这行得通,但是现在运输标签打印顺序不对,因为 mergeMap 根据 shipOrder 完成的时间发出值它的请求。我想要的是按照与原始列表相同的顺序打印标签

这可能吗?


可视化

请参阅此处了解问题:https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010

您可以看到,较早的订单甚至在较晚的订单发货之前就已开始打印。

最佳答案

我确实设法部分解决了它,所以我将它张贴在这里作为我自己问题的答案。

我仍然非常想知道处理这种情况的规范方法。


复杂的解决方案

  1. 创建一个自定义运算符,它采用具有索引键的值({ index: number } Typescript 说法),并保留值的缓冲区,仅根据它们的 index 顺序发出它们。

  2. 将原始列表映射到嵌入了索引的对象列表。

  3. 将其传递给我们的自定义 sortByIndex 运算符。

  4. 将值映射回它们的原始值。

这是 sortByIndex 的样子:

function sortByIndex() {
    return observable => {
        return Observable.create(subscriber => {
            const buffer = new Map();
            let current = 0;
            return observable.subscribe({
                next: value => {
                    if (current != value.index) {
                        buffer.set(value.index, value);
                    } else {
                        subscriber.next(value);
                    
                        while (buffer.has(++current)) {
                            subscriber.next(buffer.get(current));
                            buffer.delete(current);
                        }
                    }
                },
                complete: value => subscriber.complete(),
            });
        });
    };
}

有了 sortByIndex 运算符,我们现在可以完成整个管道:

of(1, 2, 3, 4, 5, 6).pipe(
    map((number, index) => ({ number, index })),
    mergeMap(async ({ number, index }) => {
        const doubled = await api.get('/double', { number });
        return { index, number: doubled };
    }, 3),
    sortByIndex(),
    map(({ number }) => number)
);

在此处查看实际操作:https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010

创建一个concurrentConcat操作符

事实上,有了这个 sortByIndex 运算符,我们现在可以创建一个通用的 concurrentConcat 运算符,它将对 { 索引进行转换: number, value: T 内部类型:

function concurrentConcat(mapper, parallel) {
    return observable => {
        return observable.pipe(
            mergeMap(
                mapper,
                (_, value, index) => ({ value, index }),
                parallel
            ),
            sortByIndex(),
            map(({ value }) => value)
        );
    };
}

然后我们可以使用此 concurrentConcat 运算符代替 mergeMap,它现在将按其原始顺序发出值:

of(1, 2, 3, 4, 5, 6).pipe(
    concurrentConcat(number => api.get('/double', { number }), 3),
);

在此处查看实际操作:https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010

所以要解决我最初的订单发货问题:

from(orderNumbers)
    .pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
    .subscribe(orderNumber => printShippingLabel(orderNumber));

在此处查看实际操作:https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010

您可以看到,即使较晚的订单最终可能会先于较早的订单发货,但标签始终按原始订单打印。


结论

这个解决方案甚至还不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个如此普遍的问题,我觉得必须有一种更简单的(内置的)方法来解决这个问题:|

关于javascript - 具有原始顺序的 RxJS mergeMap(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57045892/

相关文章:

JavaScript 如何创建全局对象

Rxjs - 如何提取数组中的多个值并将它们同步反馈给可观察流

angular - 如何在 RxJS 6 的管道中启动 observable?

javascript - rxjs/scheduler/VirtualTimeScheduler.js 404 未找到

javascript - Javascript如何实现日期比较运算符?

javascript - ExtJS视口(viewport)产生空白页面

javascript - 什么是 jQuery 与 Ajax.Request 以及 onSuccess 和 onComplete 的等价物?

javascript - RxJS5 与延迟可观察值结合的高级示例

javascript - 如何从 CycleJS 中的 POST 请求获取文件

javascript - 如何用 jest 测试 rxjs ajax 调用?