scala - 多个 http 请求的 Akka 流程

标签 scala akka akka-stream akka-http

在我的一个项目中,我有一个 akka actor,用于向我的 google fcm 服务器发送帖子请求。参与者获取一个 id 列表,并应发出列表中尽可能多的请求。我在 runForeach(println(_)) 中打印出服务器的响应,但我只得到整个 id 列表的一个打印输出。为什么会出现这种情况?

class FCMActor(val key: String) extends Actor{
  import fcm.FCMActor._
  import akka.pattern.pipe
  import context.dispatcher

  private implicit def system: ActorSystem = ActorSystem()
  final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))

  def buildBody(id: Option[String]): String = {
    Json.obj(
      "to" -> id,
      "priority" -> "high",
      "data" -> Json.obj("message" -> "Firebase Clud Message"),
      "time_to_live" -> 60
    ).toString()
  }

  def buildHttpRequest(body: String): HttpRequest = {
    HttpRequest(method = HttpMethods.POST,
      uri = s"/fcm/send",
      entity = HttpEntity(MediaTypes.`application/json`, body),
      headers = List(RawHeader("Authorization", s"key=$key")))
  }

  val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = {
    Http().outgoingConnection("fcm.googleapis.com")
  }

  def send(ids: List[Option[String]]) = {

    val httpRequests: List[HttpRequest] = ids.map(buildBody).map(buildHttpRequest)
    println(httpRequests)

    Source(httpRequests).via(connectionFlow).runForeach(println(_)) // << here I only get one println
  }

  override def receive: Receive = {
    case SendToIds(ids: List[Option[String]]) =>
      send(ids)

  }
}

最佳答案

您没有使用服务器发送给您的响应实体。要了解为什么这很重要,请查看相关的 docs page .

尝试修复此问题的快速代码更改是:

... .runForeach{ response =>
  response.discardEntityBytes()
  println(response)
}

或者,如果您确实对该实体感兴趣,那么类似于

... .runForeach{ _.entity.dataBytes
  .runFold(ByteString.empty) { case (acc, b) => acc ++ b }
  .map(println(_))
}

关于scala - 多个 http 请求的 Akka 流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41397704/

相关文章:

mongodb - 不坚持 Scala None's 而不是坚持为空值

java - 具有 Java 监听器模式的 Scala 语法糖

heroku - 使用 Play 2 框架在 Heroku 中进行 Akka 调度

scala - 将元素从外部推送到 fs2 中的 react 流

scala - 有没有办法使用 Akka-Stream 获得可预测的 Actor 命名?

scala - 在 Scala 中表示为元组的树

scala - `sbt package` 导致巨大的 war 文件 - 如何使它更小?

scala - Akka/Scala 检测关闭的 TCP 连接?

akka - 为什么 Akka 在这个例子中等待?

json - Akka HTTP流JSON反序列化