抽象问题
有没有什么方法可以按照外部可观察对象的原始顺序使用 mergeMap
的结果,同时仍然允许内部可观察对象并行运行?
更详细的解释
让我们看一下两个合并映射运算符:
-
...它需要一个映射回调,以及可以并发运行的内部可观察量的数量:
of(1, 2, 3, 4, 5, 6).pipe( mergeMap(number => api.get('/double', { number }), 3) );
在此处查看实际操作:https://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010
这将分别为
1
、2
和3
触发 3 个并行请求。一旦其中一个请求完成,它就会触发另一个对4
的请求。依此类推,始终保持 3 个并发请求,直到处理完所有值。但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能是乱序的。所以不是:
[2, 4, 6, 8, 10, 12]
...我们实际上可能得到:
[4, 2, 8, 10, 6, 12] // or any other permutation
-
...输入
concatMap
。此运算符确保可观察对象全部按原始顺序连接,以便:of(1, 2, 3, 4, 5, 6).pipe( concatMap(number => api.get('/double', { number })) );
...将始终产生:
[2, 4, 6, 8, 10, 12]
在此处查看实际操作:https://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010
这就是我们想要的,但现在请求不会并行运行。作为the documentation说:
concatMap
is equivalent tomergeMap
withconcurrency
parameter set to1
.
回到问题:是否有可能获得 mergeMap
的好处,从而可以并行运行给定数量的请求,同时仍然具有映射值以原始顺序发出?
我的具体问题
以上是对问题的抽象描述。当您知道手头的实际问题时,有时更容易推理问题,所以这里是:
我有一份必须发货的订单 list :
const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
我有一个实际运送订单的
shipOrder
方法。它返回一个Promise
:const shipOrder = orderNumber => api.shipOrder(orderNumber);
API 最多只能同时处理 5 个订单发货,所以我使用
mergeMap
来处理:from(orderNumbers).pipe( mergeMap(orderNumber => shipOrder(orderNumber), 5) );
订单发货后,我们需要打印其发货标签。我有一个
printShippingLabel
函数,根据已发货订单的订单号,该函数将打印其发货标签。所以我订阅了我们的 observable,并在输入值时打印运输标签:from(orderNumbers) .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5)) .pipe(orderNumber => printShippingLabel(orderNumber));
这行得通,但是现在运输标签打印顺序不对,因为
mergeMap
根据shipOrder
完成的时间发出值它的请求。我想要的是按照与原始列表相同的顺序打印标签。
这可能吗?
可视化
请参阅此处了解问题:https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010
您可以看到,较早的订单甚至在较晚的订单发货之前就已开始打印。
最佳答案
我确实设法部分解决了它,所以我将它张贴在这里作为我自己问题的答案。
我仍然非常想知道处理这种情况的规范方法。
复杂的解决方案
创建一个自定义运算符,它采用具有索引键的值(
{ index: number }
Typescript 说法),并保留值的缓冲区,仅根据它们的index
顺序发出它们。将原始列表映射到嵌入了
索引
的对象列表。将其传递给我们的自定义
sortByIndex
运算符。将值映射回它们的原始值。
这是 sortByIndex
的样子:
function sortByIndex() {
return observable => {
return Observable.create(subscriber => {
const buffer = new Map();
let current = 0;
return observable.subscribe({
next: value => {
if (current != value.index) {
buffer.set(value.index, value);
} else {
subscriber.next(value);
while (buffer.has(++current)) {
subscriber.next(buffer.get(current));
buffer.delete(current);
}
}
},
complete: value => subscriber.complete(),
});
});
};
}
有了 sortByIndex
运算符,我们现在可以完成整个管道:
of(1, 2, 3, 4, 5, 6).pipe(
map((number, index) => ({ number, index })),
mergeMap(async ({ number, index }) => {
const doubled = await api.get('/double', { number });
return { index, number: doubled };
}, 3),
sortByIndex(),
map(({ number }) => number)
);
在此处查看实际操作:https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010
创建一个concurrentConcat
操作符
事实上,有了这个 sortByIndex
运算符,我们现在可以创建一个通用的 concurrentConcat
运算符,它将对 { 索引进行转换: number, value: T
内部类型:
function concurrentConcat(mapper, parallel) {
return observable => {
return observable.pipe(
mergeMap(
mapper,
(_, value, index) => ({ value, index }),
parallel
),
sortByIndex(),
map(({ value }) => value)
);
};
}
然后我们可以使用此 concurrentConcat
运算符代替 mergeMap
,它现在将按其原始顺序发出值:
of(1, 2, 3, 4, 5, 6).pipe(
concurrentConcat(number => api.get('/double', { number }), 3),
);
在此处查看实际操作:https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010
所以要解决我最初的订单发货问题:
from(orderNumbers)
.pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
.subscribe(orderNumber => printShippingLabel(orderNumber));
在此处查看实际操作:https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010
您可以看到,即使较晚的订单最终可能会先于较早的订单发货,但标签始终按原始订单打印。
结论
这个解决方案甚至还不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个如此普遍的问题,我觉得必须有一种更简单的(内置的)方法来解决这个问题:|
关于javascript - 具有原始顺序的 RxJS mergeMap(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57045892/