scala - 使用有界并行性对 Scala Futures 进行排序(不要搞乱 ExecutorContexts)

标签 scala future rx-java

背景:我有一个功能:

  def doWork(symbol: String): Future[Unit]

它会启动一些副作用来获取数据并存储它,并在完成后完成一个 Future。但是,后端基础设施有使用限制,因此不能并行发出超过 5 个这些请求。我有一个我需要通过的 N 个符号列表:
  var symbols = Array("MSFT",...)

但我想对它们进行排序,以便同时执行不超过 5 个。鉴于:
  val allowableParallelism = 5

我目前的解决方案是(假设我正在使用 async/await):
  val symbolChunks = symbols.toList.grouped(allowableParallelism).toList
  def toThunk(x: List[String]) = () => Future.sequence(x.map(doWork))
  val symbolThunks = symbolChunks.map(toThunk)
  val done = Promise[Unit]()
  def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match {
    case Nil => done.success()
    case x::xs => x().onComplete(_ => procThunks(xs))
  }
  procThunks(symbolThunks)
  await { done.future }

但是,出于显而易见的原因,我对此并不十分满意。我觉得这应该可以通过折叠来实现,但是每次我尝试时,我最终都会热切地创建 Futures。我还使用 concatMap 尝试了一个带有 RxScala Observables 的版本,但这似乎也有点过分了。

有没有更好的方法来实现这一点?

最佳答案

我有示例如何使用 scalaz-stream 做到这一点。这是相当多的代码,因为需要将scala Future转换为scalaz Task(延迟计算的抽象)。但是需要将其添加到项目中一次。另一种选择是使用 Task 来定义“doWork”。我个人更喜欢构建异步程序的任务。

  import scala.concurrent.{Future => SFuture}
  import scala.util.Random
  import scala.concurrent.ExecutionContext.Implicits.global


  import scalaz.stream._
  import scalaz.concurrent._

  val P = scalaz.stream.Process

  val rnd = new Random()

  def doWork(symbol: String): SFuture[Unit] = SFuture {
    Thread.sleep(rnd.nextInt(1000))
    println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}")
  }

  val symbols = Seq("AAPL", "MSFT", "GOOGL", "CVX").
    flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}"))

  implicit class Transformer[+T](fut: => SFuture[T]) {
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
      import scala.util.{Failure, Success}
      import scalaz.syntax.either._
      Task.async {
        register =>
          fut.onComplete {
            case Success(v) => register(v.right)
            case Failure(ex) => register(ex.left)
          }
      }
    }
  }

  implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
    def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = {
      val actions =
        process.
          zipWith(f)((data, f) => f(data))

      val nestedActions =
        actions.map(P.eval)

      merge.mergeN(concurrencyLevel)(nestedActions)
    }
  }

  val workChannel = io.channel((s: String) => doWork(s).toTask)

  val process = Process.emitAll(symbols).concurrently(5)(workChannel)

  process.run.run

当您在范围内进行所有这些转换时,基本上您只需要:
  val workChannel = io.channel((s: String) => doWork(s).toTask)

  val process = Process.emitAll(symbols).concurrently(5)(workChannel)

非常简短且自我描述

关于scala - 使用有界并行性对 Scala Futures 进行排序(不要搞乱 ExecutorContexts),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27085085/

相关文章:

java - Scala 只接受 List 中的 String 或 Int 通用案例类

java - 将 Runnable 实现为接口(interface)的抽象类

java - RxJava门机制

netbeans - NetBeans Gradle Build中未使用Javadoc

scala - Scala 中的 Seq 和 IndexedSeq/LinearSeq 有什么区别?

scala - 获取一系列 Spark RDD 的列

scala - 如何从 RDD 创建 Spark 数据集

scala - Await.ready 和 Await.result 的区别

asynchronous - 从闭包调用异步函数

android-studio - Kotlin 和 RxJava2 zip 运算符 - 不能使用提供的参数调用以下函数