背景:我有一个功能:
def doWork(symbol: String): Future[Unit]
它会启动一些副作用来获取数据并存储它,并在完成后完成一个 Future。但是,后端基础设施有使用限制,因此不能并行发出超过 5 个这些请求。我有一个我需要通过的 N 个符号列表:
var symbols = Array("MSFT",...)
但我想对它们进行排序,以便同时执行不超过 5 个。鉴于:
val allowableParallelism = 5
我目前的解决方案是(假设我正在使用 async/await):
val symbolChunks = symbols.toList.grouped(allowableParallelism).toList
def toThunk(x: List[String]) = () => Future.sequence(x.map(doWork))
val symbolThunks = symbolChunks.map(toThunk)
val done = Promise[Unit]()
def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match {
case Nil => done.success()
case x::xs => x().onComplete(_ => procThunks(xs))
}
procThunks(symbolThunks)
await { done.future }
但是,出于显而易见的原因,我对此并不十分满意。我觉得这应该可以通过折叠来实现,但是每次我尝试时,我最终都会热切地创建 Futures。我还使用 concatMap 尝试了一个带有 RxScala Observables 的版本,但这似乎也有点过分了。
有没有更好的方法来实现这一点?
最佳答案
我有示例如何使用 scalaz-stream 做到这一点。这是相当多的代码,因为需要将scala Future转换为scalaz Task(延迟计算的抽象)。但是需要将其添加到项目中一次。另一种选择是使用 Task 来定义“doWork”。我个人更喜欢构建异步程序的任务。
import scala.concurrent.{Future => SFuture}
import scala.util.Random
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz.stream._
import scalaz.concurrent._
val P = scalaz.stream.Process
val rnd = new Random()
def doWork(symbol: String): SFuture[Unit] = SFuture {
Thread.sleep(rnd.nextInt(1000))
println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}")
}
val symbols = Seq("AAPL", "MSFT", "GOOGL", "CVX").
flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}"))
implicit class Transformer[+T](fut: => SFuture[T]) {
def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
import scala.util.{Failure, Success}
import scalaz.syntax.either._
Task.async {
register =>
fut.onComplete {
case Success(v) => register(v.right)
case Failure(ex) => register(ex.left)
}
}
}
}
implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = {
val actions =
process.
zipWith(f)((data, f) => f(data))
val nestedActions =
actions.map(P.eval)
merge.mergeN(concurrencyLevel)(nestedActions)
}
}
val workChannel = io.channel((s: String) => doWork(s).toTask)
val process = Process.emitAll(symbols).concurrently(5)(workChannel)
process.run.run
当您在范围内进行所有这些转换时,基本上您只需要:
val workChannel = io.channel((s: String) => doWork(s).toTask)
val process = Process.emitAll(symbols).concurrently(5)(workChannel)
非常简短且自我描述
关于scala - 使用有界并行性对 Scala Futures 进行排序(不要搞乱 ExecutorContexts),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27085085/