scala - Actor 的接收方法中的多个 Future 调用

标签 scala redis akka actor future

我正在尝试在 Actor 的 receive 方法中进行两次外部调用(对 Redis 数据库)。两个调用都返回一个 Future,我需要第二个中第一个 Future 的结果。 我将这两个调用都包装在 Redis 事务中,以避免其他人在我读取数据库时修改数据库中的值。

actor 的内部状态根据第二个 Future 的值进行更新。

这是我当前代码的样子,但我是不正确的,因为我正在 Future.onComplete 回调中更新 actor 的内部状态。

我不能使用 PipeTo 模式,因为我需要两个 Future 都必须在事务中。 如果我对第一个 Future 使用 Await,那么我的接收方法将阻塞。 知道如何解决这个问题吗?

我的第二个问题与我如何使用Future有关。下面这种 Future 的用法是否正确?一般来说,有没有更好的方法来处理多个 future ?想象一下,如果有 3 或 4 个 Future,每个 Future 都取决于前一个。

import akka.actor.{Props, ActorLogging, Actor}
import akka.util.ByteString
import redis.RedisClient

import scala.concurrent.Future
import scala.util.{Failure, Success}


object GetSubscriptionsDemo extends App {
  val akkaSystem = akka.actor.ActorSystem("redis-demo")
  val actor = akkaSystem.actorOf(Props(new SimpleRedisActor("localhost", "dummyzset")), name = "simpleactor")
  actor ! UpdateState
}

case object UpdateState

class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  //required by Future
  implicit val ctx = context dispatcher

  var rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => {
      log.info("Start of UpdateState ...")

      val tran = rClient.transaction()

      val zf: Future[Long] = tran.zcard(key)  //FIRST Future 
      zf.onComplete {

        case Success(z) => {
          //SECOND Future, depends on result of FIRST Future 
          val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 
          rf.onComplete {
            case Success(x) => {
              //convert ByteString to UTF8 String
              val v = x.map(_.utf8String)
              log.info(s"Updating state with $v ")
              //update actor's internal state inside callback for a Future
              //IS THIS CORRECT ?
              mutableState ++ v
            }
            case Failure(e) => {
              log.warning("ZRANGE future failed ...", e)
            }
          }
        }
        case Failure(f) => log.warning("ZCARD future failed ...", f)
      }
      tran.exec()

    }
  }

}

编译但是当我运行时它被击中了。

2014-08-07  INFO [redis-demo-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started
2014-08-07 04:38:35.106UTC INFO [redis-demo-akka.actor.default-dispatcher-3] e.c.s.e.a.g.SimpleRedisActor - Start of UpdateState ...
2014-08-07 04:38:35.134UTC INFO [redis-demo-akka.actor.default-dispatcher-8] r.a.RedisClientActor - Connect to localhost/127.0.0.1:6379
2014-08-07 04:38:35.172UTC INFO [redis-demo-akka.actor.default-dispatcher-4] r.a.RedisClientActor - Connected to localhost/127.0.0.1:6379

更新 1

为了使用 pipeTo 模式,我需要访问我所在的 actor 中的 tran 和 FIRST Future (zf)我将 Future 传递给因为第二个 Future 取决于 FIRST 的值 (z)。

    //SECOND Future, depends on result of FIRST Future 
      val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 

最佳答案

在不太了解您正在使用的 redis 客户端的情况下,我可以提供一个替代解决方案,该解决方案应该更简洁并且不会因关闭可变状态而出现问题。这个想法是使用一种主/ worker 的情况,其中主人(SimpleRedisActor)接收到完成工作的请求,然后委托(delegate)给执行工作并响应更新状态的 worker 。该解决方案看起来像这样:

object SimpleRedisActor{
  case object UpdateState
  def props(ip:String, key:String) = Props(classOf[SimpleRedisActor], ip, key)
}

class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {
  import SimpleRedisActor._
  import SimpleRedisWorker._

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  val rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => 
      log.info("Start of UpdateState ...")      
      val worker = context.actorOf(SimpleRedisWorker.props)
      worker ! DoWork(rClient, key)

    case WorkResult(result) =>
      mutableState ++ result

    case FailedWorkResult(ex) =>
      log.error("Worker got failed work result", ex)
  }
}

object SimpleRedisWorker{
  case class DoWork(client:RedisClient, key:String)
  case class WorkResult(result:Seq[String])
  case class FailedWorkResult(ex:Throwable)
  def props = Props[SimpleRedisWorker]
}

class SimpleRedisWorker extends Actor{
  import SimpleRedisWorker._
  import akka.pattern.pipe
  import context._

  def receive = {
    case DoWork(client, key) =>
      val trans = client.transaction()
      trans.zcard(key) pipeTo self
      become(waitingForZCard(sender, trans, key) orElse failureHandler(sender, trans))
  }

  def waitingForZCard(orig:ActorRef, trans:RedisTransaction, key:String):Receive = {      
    case l:Long =>
      trans.zrange(key, l -1, l) pipeTo self
      become(waitingForZRange(orig, trans) orElse failureHandler(orig, trans))
  }

  def waitingForZRange(orig:ActorRef, trans:RedisTransaction):Receive = {
    case s:Seq[ByteString] =>
      orig ! WorkResult(s.map(_.utf8String))
      finishAndStop(trans)
  }

  def failureHandler(orig:ActorRef, trans:RedisTransaction):Receive = {
    case Status.Failure(ex) => 
      orig ! FailedWorkResult(ex)
      finishAndStop(trans)   
  }

  def finishAndStop(trans:RedisTransaction) {
    trans.exec()
    context stop self
  }
}

工作人员启动事务,然后调用 redis 并最终在停止自身之前完成事务。当它调用 redis 时,它会获取 future 并通过管道返回给自己以继续处理,在两者之间更改 receive 方法作为显示其状态进展的机制。在这样的模型中(我想这有点类似于错误内核模式),主人拥有并保护状态,将“有风险”的工作委托(delegate)给可以弄清楚状态应该发生什么变化的 child ,但零钱仍归主人所有。

再说一次,我不知道您正在使用的 Redis 客户端的功能,也不知道它是否足够安全,甚至可以执行此类操作,但这不是重点。关键是展示一个更安全的结构来做这样的事情,涉及需要安全更改的 future 和状态。

关于scala - Actor 的接收方法中的多个 Future 调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25174504/

相关文章:

sql - Spark SQL : put the conditional count result into a new column

python - 在redis中存储两组表

scala - Akka-http 流使用 Slick 3.0 Databasepublisher

pandas - Spark 与 Scala 和 Pandas

scala - 案例类构造函数中继承的抽象类型

scala - Redis 与 Redisson 框架在 Scala 中的不可预测行为

scala - Akka Actors 可以替代服务层吗?

scala - Akka 基于 Actor 的自定义事件总线实现导致瓶颈

scala - Sqs Akka Stream 内存不足

laravel - [] 没有连接器