scala - 在涉及多个 JVM 的 Akka Streams 中维持背压的方法

标签 scala akka-stream reactive-streams backpressure akka-remoting

我知道从 Akka 2.4.16 开始, react 流没有“远程”实现。该规范重点关注在单个 JVM 上运行的流。

但是,考虑到用例涉及另一个 JVM 进行某些处理,同时保持背压。这个想法是有一个主应用程序提供运行流的用户界面。例如,该流有一个执行一些繁重计算的阶段,这些计算应该在不同的机器上运行。我对以分布式方式运行流的方法感兴趣 - 我遇到了一些文章,指出了一些想法:

还有哪些其他选择?上述有什么明显的缺点吗?有什么需要考虑的特殊特征吗?

更新:这个问题并不限于单个用例。我通常对在分布式环境中使用流的所有可能方法感兴趣。这意味着,例如它只能涉及一个将 actor 与 .mapAsync 集成的流,或者例如两台机器上可能有两个独立的流通过 Akka HTTP 进行通信。唯一的要求是必须在所有组件之间强制执行背压。

最佳答案

嗯...看来我必须为此添加一个示例。您需要了解的一件事是 BackPressure 由 GraphStages 中的 AsyncBoundries 处理。它确实与其他地方存在的组件无关。另外...它不依赖于 Artery,它只不过是新的远程传输。

这里是一个可能是最简单的跨 jvm 流的示例,

第一次申请,

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

class MyActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case msg @ _ => {
      log.info(msg.toString)
      sender() ! msg
    }
  }
}

object MyApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=18000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("my-actor-system", config)

  var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

第二个应用程序...实际上“运行”使用第一个应用程序中的参与者的流。

import akka.actor.{ActorPath, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.pattern.ask
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps
import scala.concurrent.duration._

object YourApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=19000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("your-actor-system", config)

  import actorSystem.dispatcher

  val logger = actorSystem.log

  implicit val implicitActorSystem = actorSystem
  implicit val actorMaterializer = ActorMaterializer()

  val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@127.0.0.1:18000/user/my-actor")

  val myActorSelection = actorSystem.actorSelection(myActorPath)

  val source = Source(1 to 10)

  // here this "mapAsync" wraps the given T => Future[T] function in a GraphStage
  val myRemoteComponent = Flow[Int].mapAsync(2)(i => {
    myActorSelection.resolveOne(1 seconds).flatMap(myActorRef => 
      (myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int])
    )
  })

  val sink = Sink.foreach[Int](i => logger.info(i.toString))

  val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right)

  val streamRun = stream.run()

}

关于scala - 在涉及多个 JVM 的 Akka Streams 中维持背压的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41719483/

相关文章:

scala - 使用 Akka Streams 的 worker 池

java - 在 Reactor 中实现异步副作用的最佳方法是什么?

scala - 处理用 scala 中的 Try 包裹的 Future

Java Apache Math3 MersenneTwister VS Python 随机

generics - Scala 泛型与 C# 的比较

postgresql - 将 GeoLocation Twitter4J 写入 Postgres

amazon-s3 - 在 akka-http/akka-streams 中上传/下载文件时出现问题

scala - akka 流消耗网络套接字

mysql - Reactive Streams Specification 1.0 发布后,jdbc 规范是否也会响应式?

scala - 如何使用 Spark/Scala 中的频率计数从文本文件创建二元组?