我想通过一系列异步/网络操作(远程 HTTP 请求)移动对象数组来处理它们。
在其中一些操作中,我希望确保同时处理的项目不超过 X 个。
我怎样才能实现这一目标?
示例代码:
function someAsyncOp(item) {...} // returns a promise
var source = Rx.Observable.from([{item1},{item2},...])
source
.flatMap((item) => {
// I WANT THE FOLLOWING OPERATION TO BE EXECUTING
// ON AT MAX 10 ITEMS AT A TIME, NEXT ITEM SHOULD
// BE SUBMITTED ONLY WHEN A SLOT GETS FREED AS A
// RESULT OF THE PROMISE SUCCEEDING OR FAILING
return Rx.Observable.fromPromise(someAsyncOp(item))
})
.subscribe(
console.log,
console.error,
() => console.log('completed')
)
最佳答案
flatMap
有一个名为 flatMapWithMaxConcurrent
的兄弟,它采用并发参数。它在功能上类似于 Benjamin 的回答所建议的 map(fn).merge(n)
。
function someAsyncOp(item) {...} // returns a promise
var source = Rx.Observable.from([{item1},{item2},...])
source
//Only allow a max of 10 items to be subscribed to at once
.flatMapWithMaxConcurrent(10, (item) => {
//Since a promise is eager you need to defer execution of the function
//that produces it until subscription. Defer will implicitly accept a promise
return Rx.Observable.defer(() => someAsyncOp(item))
//If you want the whole thing to continue regardless of exceptions you should also
//catch errors from the individual processes
.catch(Rx.Observable.empty())
})
.subscribe(
console.log,
console.error,
() => console.log('completed')
)
关于javascript - Rx.js 与 Promise 的并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33844824/