scala - 使用带有 onComplete 的 Scala future 列表进行异步处理以进行异常处理

标签 scala asynchronous concurrency future

我正在尝试进行大量外部服务调用,每个调用都跟进异常处理和有条件的进一步处理。我认为在内部使用 .onComplete 来扩展这个不错的 ( Asynchronous IO in Scala with futures ) 示例会很容易,但似乎我对范围界定和/或 Futures 不了解。谁能指出我正确的方向?

#!/bin/bash
scala -feature $0 $@
exit
!#

import scala.concurrent.{future, blocking, Future, Await}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Success, Failure}
import scala.language.postfixOps

val keylist = List("key1", "key2")

val myFuts: List[Future[String]] = keylist.map {
  myid => future {
    // this line simulates an external call which returns a future (retrieval from S3)
    val myfut = future { Thread.sleep(1); "START " + myid}

    var mystr = "This should have been overwritten"
    myfut.onComplete {
      case Failure(ex) => {
        println (s"failed with error: $ex")
        mystr = "FAILED"
      }
      case Success(myval) => {
        mystr = s"SUCCESS $myid: $myval"
        println (mystr)
      }
    }
    mystr
  }
}

val futset: Future[List[String]] = Future.sequence(myFuts)
println (Await.result(futset, 10 seconds))

在我的电脑(Scala 2.10.4)上,打印出:

SUCCESS key2: START key2
SUCCESS key1: START key1
List(This should have been overwritten, This should have been overwritten)

我想要(顺序不重要):

SUCCESS key2: START key2
SUCCESS key1: START key1
List(SUCCESS key2: START key2, SUCCESS key1: START key1)

最佳答案

我会避免使用 onComplete 并尝试使用它对可变变量执行副作用逻辑。相反,我会映射 future 并将失败案例处理为返回不同的值。这是您的代码的一个稍微修改的版本,在 Future 上使用 map(通过 for comprehension),然后使用 recover 来处理失败情况.希望这就是您要找的:

val keylist = List("key1", "key2")

val myFuts: List[Future[String]] = keylist.map {myid => 

  // this line simulates an external call which returns a future (retrieval from S3)
  val myfut = future { Thread.sleep(1); "START " + myid}
  val result = for (myval <- myfut) yield {
    val res = s"SUCCESS $myid: $myval"
    println(res)
    res
  }
  result.recover{
    case ex => 
      println (s"failed with error: $ex")
      "FAILED"          
  }

}

val futset: Future[List[String]] = Future.sequence(myFuts)
println (Await.result(futset, 10 seconds))

关于scala - 使用带有 onComplete 的 Scala future 列表进行异步处理以进行异常处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23358204/

相关文章:

java - 为什么可选的不用于实例变量?

scala - 如何使用 Scala DataFrameReader 选项方法

node.js - 从子进程到 API 的请求导致 ECONNRESET

scala - scalaz.concurrent.Future 与 scalaz.ContT[Trampoline, Unit, 相比有什么好处?]

go - 在 Golang 中测试一次条件

java - HashMap 中的每键锁而不是整个映射锁

sql - 优化 Slick 生成的 SQL 查询

javascript - 带有多个标记的谷歌地图 javascript => 标签文本在循环中无处不在

javascript - 在 Javascript 中将 Blob 同步转换为 base64 字符串

c++ - select() 和 recv 与 MSG_PEEK 之间的效率。异步