scala - 喷雾路线得到童星回应

标签 scala akka spray

我正在尝试弄清楚如何设置一个主Actor来调用适当的子级,以支持一些我试图模拟数据库调用的喷雾路线。我是 akka/Spray 的新手,所以只是想更好地了解如何正确设置 Spray -> Actor -> db 调用(等)。我可以从顶级 Actor 那里得到回应,但是当我尝试从父级以下的一个 Actor 级别得到回应时,我似乎无法得到任何工作。

当查看 Actor 的路径时,从我从喷涂路线调用电话的方式来看,我似乎是从临时 Actor 那里经过的。以下是我迄今为止解决此问题的内容。这一定只是用户错误/无知,只是不确定如何继续。任何建议,将不胜感激。

下面的演示喷射服务和 Redis Actor 代码片段显示了我从路线调用 Actor 的位置以及遇到问题的多个 Actor(希望我的路线从 SummaryActor 获得响应)。谢谢!

启动:

object Boot extends App {

  // we need an ActorSystem to host our application in
  implicit val system = ActorSystem("on-spray-can")

  // create and start our service actor
  val service = system.actorOf(Props[DemoServiceActor], "demo-service")

  implicit val timeout = Timeout(5.seconds)
  // start a new HTTP server on port 8080 with our service actor as the handler
  IO(Http) ? Http.Bind(service, interface = "localhost", port = 8080)
}

演示服务 Actor(用于喷雾)

class DemoServiceActor extends Actor with Api {

  // the HttpService trait defines only one abstract member, which
  // connects the services environment to the enclosing actor or test
  def actorRefFactory = context

  // this actor only runs our route, but you could add
  // other things here, like request stream processing
  // or timeout handling
  def receive = handleTimeouts orElse runRoute(route)

  //Used to watch for request timeouts
  //http://spray.io/documentation/1.1.2/spray-routing/key-concepts/timeout-handling/
  def handleTimeouts: Receive = {
    case Timedout(x: HttpRequest) =>
      sender ! HttpResponse(StatusCodes.InternalServerError, "Too late")
  }


}

//Master trait for handling large APIs
//http://stackoverflow.com/questions/14653526/can-spray-io-routes-be-split-into-multiple-controllers
trait Api extends DemoService {
  val route = {
    messageApiRouting
  }
}

演示喷雾服务(路线):

trait DemoService extends HttpService with Actor  {
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val redisActor = context.actorOf(Props[RedisActor], "redisactor")

  val messageApiRouting =
        path("summary" / Segment / Segment) { (dataset, timeslice) =>
          onComplete(getSummary(redisActor, dataset, timeslice)) {
            case Success(value) => complete(s"The result was $value")
            case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
          }
        }

  def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = Future {

    val dbMessage = DbMessage("summary", dataset + timeslice)
    val future = redisActor ? dbMessage
    val result = Await.result(future, timeout.duration).asInstanceOf[String]
    result
  }

}

Redis Actor(尚未模拟实际的 Redis 客户端)

class RedisActor extends Actor with ActorLogging {
  //  val pool = REDIS
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val summaryActor = context.actorOf(Props[SummaryActor], "summaryactor")


  def receive = {

    case msg: DbMessage => {
      msg.query match {
        case "summary" => {
          log.debug("Summary Query Request")
          log.debug(sender.path.toString)
           summaryActor ! msg
        }
      }
    }

    //If not match log an error
    case _ => log.error("Received unknown message: {} ")
  }
}

class SummaryActor extends Actor with ActorLogging{

  def receive = {
    case msg: DbMessage =>{
      log.debug("Summary Actor Received Message")
      //Send back to Spray Route

    }
  }
}

最佳答案

代码的第一个问题是,您需要从主 Actor 转发到子 Actor,以便 sender 能够正确传播并可供子 Actor 响应。因此,更改此设置(在 RedisActor 中):

summaryActor ! msg

致:

summaryActor forward msg

这是首要问题。解决这个问题,您的代码就应该开始工作了。不过还有一些事情需要注意。您的 getSummary 方法当前定义为:

def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = 
  Future {
    val dbMessage = DbMessage("summary", dataset + timeslice)
    val future = redisActor ? dbMessage
    val result = Await.result(future, timeout.duration).asInstanceOf[String]
    result
  }

这里的问题是 ask 操作 (?) 已经返回一个 Future,因此您正在阻止它以获取结果,将其包装在另一个 Future 中,以便您可以返回一个 FutureonComplete 使用。您应该能够通过直接使用从 ask 返回的 Future 来简化事情,如下所示:

def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = {
  val dbMessage = DbMessage("summary", dataset + timeslice)
  (redisActor ? dbMessage).mapTo[String]      
}

关于scala - 喷雾路线得到童星回应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30845356/

相关文章:

scala - future 不完整?

scala - Spark JDBC to DashDB (DB2) 与 CLOB 错误

scala - Akka Stream + Akka Http - 获取错误请求

scala - 从请求中提取 Spray.io Content-Type?

java - SSLHandshakeException:没有共同的密码套件 - 喷雾 jar SSL 配置

scala - REST (Squeryl/Akka/Spray) - 非常低的吞吐量

scala - 模式匹配 `@` 符号

scala - 使用特征时有没有办法避免重复类型参数?

scala - akka-persistence中persistenceId的唯一性

scala - 如何声明对 native .so 库的依赖以进行测试?