scala - akka Stream 将 akka-htpp Web 请求调用集成到流中

标签 scala akka akka-stream akka-http

开始使用 Akka Streams 我想执行一个简单的计算。扩展基本快速入门 https://doc.akka.io/docs/akka/2.5/stream/stream-quickstart.html调用 Restful Web api:

val source: Source[Int, NotUsed] = Source(1 to 100)
source.runForeach(println)

已经可以很好地打印数字了。但是当尝试创建一个 Actor 来执行 HTTP 请求时(这实际上有必要吗?),根据 https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-integrations.html

  import akka.pattern.ask
  implicit val askTimeout = Timeout(5.seconds)
  val words: Source[String, NotUsed] =
    Source(List("hello", "hi"))

  words
    .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
    // continue processing of the replies from the actor
    .map(_.toLowerCase)
    .runWith(Sink.ignore)

我无法编译它,因为 ? 运算符未定义。据我所知,这个只能在 Actor 内部定义。 我还不明白在 mapAsync 内部到底哪里需要调用我的自定义 Actor。

编辑

https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/至少包含示例的一部分。 看起来创建一个 actor 并不是强制的,即

implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()


val source = Source(List("232::03::14062::19965186", "232::03::14062::19965189"))
    .map(cellKey => {
      val splits = cellKey.split("::")
      val mcc = splits(0)
      val mnc = splits(1)
      val lac = splits(2)
      val ci = splits(3)
      CellKeySource(cellKey, mcc, mnc, lac, ci)
    })
    .limit(2)
    .mapAsyncUnordered(2)(ck => getResponse(ck.cellKey, ck.mobileCountryCode, ck.mobileNetworkCode, ck.locationArea, ck.cellKey)("<<myToken>>"))

  def getResponse(cellKey: String, mobileCountryCode:String, mobileNetworkCode:String, locationArea:String, cellId:String)(token:String): Future[String] = {
    RestartSource.withBackoff(
      minBackoff = 10.milliseconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2,
      maxRestarts = 2
    ) { () =>
      val responseFuture: Future[HttpResponse] =
        Http().singleRequest(HttpRequest(uri = s"https://www.googleapis.com/geolocation/v1/geolocate?key=${token}", entity = ByteString(
          // TODO use proper JSON objects
          s"""
             |{
             |  "cellTowers": [
             |    "mobileCountryCode": $mobileCountryCode,
             |    "mobileNetworkCode": $mobileNetworkCode,
             |    "locationAreaCode": $locationArea,
             |    "cellId": $cellId,
             |  ]
             |}
          """.stripMargin)))

      Source.fromFuture(responseFuture)
        .mapAsync(parallelism = 1) {
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            Unmarshal(entity).to[String]
          case HttpResponse(statusCode, _, _, _) =>
            throw WebRequestException(statusCode.toString() )
        }
    }
      .runWith(Sink.head)
      .recover {
        case _ => throw StreamFailedAfterMaxRetriesException()
      }
  }

val done: Future[Done] = source.runForeach(println)
done.onComplete(_ ⇒ system.terminate())

已经是问题的(部分)答案,即如何集成 Akka-streams + akka-http。但是,它不起作用,即仅抛出错误 400,并且永远不会终止。

最佳答案

  1. 我认为您已经找到了 api如何调用akka-http客户端

  2. 关于您的第一个代码片段不起作用。我认为对这个例子本身发生了一些误解。您希望示例中的代码在复制后即可工作。但该文档的目的只是演示一个示例/概念,即如何将一些长时间运行的任务从流中委托(delegate)出去,然后在结果准备好时使用它。为此,使用了对akka actor的ask调用,因为对ask方法的调用会返回一个Future。也许该文档的作者只是省略了 Actor 的定义。你可以试试这个例子:

    import java.lang.System.exit
    
    import akka.NotUsed
    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import akka.pattern.ask
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Sink, Source}
    import akka.util.Timeout
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.language.higherKinds
    
    object App extends scala.App {
    
      implicit val sys: ActorSystem = ActorSystem()
      implicit val mat: ActorMaterializer = ActorMaterializer()
    
      val ref: ActorRef = sys.actorOf(Props[Translator])
    
      implicit val askTimeout: Timeout = Timeout(5.seconds)
      val words: Source[String, NotUsed] = Source(List("hello", "hi"))
    
      words
        .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
        .map(_.toLowerCase)
        .runWith(Sink.foreach(println))
        .onComplete(t => {
          println(s"finished: $t")
          exit(1)
        })
    }
    
    class Translator extends Actor {
    
      override def receive: Receive = {
        case msg => sender() ! s"$msg!"
      }
    }
    

关于scala - akka Stream 将 akka-htpp Web 请求调用集成到流中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53806122/

相关文章:

scala - 在 Slurm 上运行 Spark

scala - 如何使用类型化数据集将多值列拆分为单独的行?

scala - 在 Scala IDE 中使用 Akka

java - 使用 Java 在 akka 测试套件中测试流

scala - 令人困惑的 Scala 语法

Akka Streams : What does Mat represents in Source[out, Mat]

scala - 无法使用 IDEA 和 SBT 运行 LWJGL

scala - 使用 Apache Buildr 编译 Scala 2.8.x 代码

scala - 是否可以在 scala 的 akka ByteString 上进行模式匹配?

scala - 登录 Akka 时如何缩写 ActorRef 的路径?