在 Akka 中,我想向集群中的参与者发送“状态”消息以了解他们的状态。这些参与者可能处于各种健康状态,包括死亡/无法响应。
我想等待一段时间,比如 10 秒,然后继续处理我在该时间限制内碰巧收到的任何结果。我不想让整个事情失败,因为 1 或 2 个有问题并且没有在 10 秒内响应/超时。
我试过这个:
object GetStats {
def unapply(n: ActorRef )(implicit system: ActorSystem): Option[Future[Any]] = Try {
implicit val t: Timeout = Timeout(10 seconds)
n ? "A"
}.toOption
}
...
val z = List(a,b,c,d) // where a-d are ActorRefs to nodes I want to status
val q = z.collect {
case GetStats(s) => s
}
// OK, so here 'q' is a List[Future[Any]]
val allInverted = Future.sequence(q) // now we have Future[List[Any]]
val ok = Await.result(allInverted, 10 seconds).asInstanceOf[List[String]]
println(ok)
这段代码的问题在于,如果 1 个或多个没有响应,它似乎会抛出 TimeoutException。然后我无法获得确实返回的响应。
最佳答案
假设您确实需要每 10 秒至少收集部分统计信息 - 解决方案是将“无响应”转换为实际故障。
要实现这一点,只需增加 Await
与 implicit val t:Timeout
相比有点超时问。之后,您的 future 本身(从 ?
返回)将提前失败。所以你可以recover他们:
// Works only when AskTimeout >> AwaitTimeout
val qfiltered = q.map(_.map(Some(_)).recover{case _ => None}) //it's better to match TimeoutException here instead of `_`
val allInverted = Future.sequence(q).map(_.flatten)
例子:
scala> class MyActor extends Actor{ def receive = {case 1 => sender ! 2; case _ =>}}
defined class MyActor
scala> val a = sys.actorOf(Props[MyActor])
a: akka.actor.ActorRef = Actor[akka://1/user/$c#1361310022]
scala> implicit val t: Timeout = Timeout(1 seconds)
t: akka.util.Timeout = Timeout(1 second)
scala> val l = List(a ? 1, a ? 100500).map(_.map(Some(_)).recover{case _ => None})
l: List[scala.concurrent.Future[Option[Any]]] = List(scala.concurrent.impl.Promise$DefaultPromise@7faaa183, scala.concurrent.impl.Promise$DefaultPromise@1b51e0f0)
scala> Await.result(Future.sequence(l).map(_.flatten), 3 seconds)
warning: there were 1 feature warning(s); re-run with -feature for details
res29: List[Any] = List(2)
如果您想知道哪个 Future 没有响应 - 删除
flatten
.接收部分响应应该足以连续收集统计信息,就好像某个工作角色没有及时响应一样 - 下次它将使用实际数据进行响应,并且不会丢失任何数据。但是你应该正确地处理 worker 的生命周期,而不是松散(如果重要的话)actor 本身内部的任何数据。
如果超时的原因只是对系统的高压 - 您可以考虑:
如果这种超时的原因是一些远程存储 - 如果客户端准备好,那么部分响应是处理它的正确方法。例如,WebUI 可能会使用一些旋转的东西警告用户显示的数据可能未满。但是不要忘记不要用存储请求阻塞 actor( future 可能会有所帮助),或者至少将它们移动到单独的线程池中。
如果工作人员因失败(如异常)而没有响应 - 您仍然可以从您的
preRestart
向发件人发送通知- 所以你也可以收到 worker 没有统计数据的原因。这里唯一的事情 - 您应该检查发件人是否可用( it may not be )附言我希望你不要这样做
Await.result
在某个 actor 内部 - 至少为了您的应用程序性能,不建议阻止 actor。在某些情况下,它甚至可能导致死锁或内存泄漏。因此,应将 await 放置在系统外观的某个位置(如果底层框架不支持 future )。所以异步处理你的答案可能是有意义的(如果某些参与者没有响应,你仍然需要从失败中恢复它们):
//actor:
val parent = sender
for(list <- Future.sequence(qfiltered)) {
parent ! process(list)
}
//in sender (outside of the actors):
Await(actor ? Get, 10 seconds)
关于scala - Akka有什么好办法等待一群Actors回应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28255515/