javascript - Rx.js 与 Promise 的并发

标签 javascript concurrency promise rxjs

我想通过一系列异步/网络操作(远程 HTTP 请求)移动对象数组来处理它们。

在其中一些操作中,我希望确保同时处理的项目不超过 X 个。

我怎样才能实现这一目标?

示例代码:

function someAsyncOp(item) {...} // returns a promise

var source = Rx.Observable.from([{item1},{item2},...])
source
  .flatMap((item) => {

    // I WANT THE FOLLOWING OPERATION TO BE EXECUTING  
    // ON AT MAX 10 ITEMS AT A TIME, NEXT ITEM SHOULD
    // BE SUBMITTED ONLY WHEN A SLOT GETS FREED AS A 
    // RESULT OF THE PROMISE SUCCEEDING OR FAILING

    return Rx.Observable.fromPromise(someAsyncOp(item))

  })
  .subscribe(
    console.log, 
    console.error, 
    () => console.log('completed')
  )

最佳答案

flatMap 有一个名为 flatMapWithMaxConcurrent 的兄弟,它采用并发参数。它在功能上类似于 Benjamin 的回答所建议的 map(fn).merge(n)

function someAsyncOp(item) {...} // returns a promise

var source = Rx.Observable.from([{item1},{item2},...])
source
   //Only allow a max of 10 items to be subscribed to at once
  .flatMapWithMaxConcurrent(10, (item) => {

    //Since a promise is eager you need to defer execution of the function
    //that produces it until subscription. Defer will implicitly accept a promise
    return Rx.Observable.defer(() => someAsyncOp(item))

    //If you want the whole thing to continue regardless of exceptions you should also
    //catch errors from the individual processes
                        .catch(Rx.Observable.empty())
  })
  .subscribe(
    console.log, 
    console.error, 
    () => console.log('completed')
  )

关于javascript - Rx.js 与 Promise 的并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33844824/

相关文章:

javascript - 选中复选框后导航到动态创建的 Backbone 路径

javascript - 如何正确地从 Promise 返回多个值?

javascript - Await 返回 [Function] 而不是值

javascript - Python3 中的 Futures 和 ES6 中的 Promises 的区别

javascript - 变量已经定义

javascript - lodash绑定(bind)函数用作jQuery事件处理程序...可能吗?

javascript - 调用 jQuery 插件而不指定任何元素

java - 假设我知道我将在 x64 cpus 上运行,我可以忽略哪些 JVM 同步实践?

c++ - 是否有一个真正有效的示例显示 x86_64 上 Store-Load 重新排序的副作用?

c++ - MinGW 4.6.2 std::原子指针