我知道从 Akka 2.4.16 开始, react 流没有“远程”实现。该规范重点关注在单个 JVM 上运行的流。
但是,考虑到用例涉及另一个 JVM 进行某些处理,同时保持背压。这个想法是有一个主应用程序提供运行流的用户界面。例如,该流有一个执行一些繁重计算的阶段,这些计算应该在不同的机器上运行。我对以分布式方式运行流的方法感兴趣 - 我遇到了一些文章,指出了一些想法:
- 使用 Akka HTTP 通过 TCP 连接流 ( Stackoverflow )
- 使用 Artery 进行一定程度的简化( Stackoverflow 、 Akka Blog )
- 将 actor 集成到流中( Answer from Viktor Klang 、 Akka Docs )
还有哪些其他选择?上述有什么明显的缺点吗?有什么需要考虑的特殊特征吗?
更新:这个问题并不限于单个用例。我通常对在分布式环境中使用流的所有可能方法感兴趣。这意味着,例如它只能涉及一个将 actor 与 .mapAsync
集成的流,或者例如两台机器上可能有两个独立的流通过 Akka HTTP 进行通信。唯一的要求是必须在所有组件之间强制执行背压。
最佳答案
嗯...看来我必须为此添加一个示例。您需要了解的一件事是 BackPressure 由 GraphStages 中的 AsyncBoundries 处理。它确实与其他地方存在的组件无关。另外...它不依赖于 Artery,它只不过是新的远程传输。
这里是一个可能是最简单的跨 jvm 流的示例,
第一次申请,
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
class MyActor extends Actor with ActorLogging {
override def receive: Receive = {
case msg @ _ => {
log.info(msg.toString)
sender() ! msg
}
}
}
object MyApplication extends App {
val config = ConfigFactory.parseString(
"""
|akka{
| actor {
| provider = remote
| }
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| untrusted-mode = off
| netty.tcp {
| hostname="127.0.0.1"
| port=18000
| }
| }
|}
""".stripMargin
)
val actorSystem = ActorSystem("my-actor-system", config)
var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")
}
第二个应用程序...实际上“运行”使用第一个应用程序中的参与者的流。
import akka.actor.{ActorPath, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.pattern.ask
import com.typesafe.config.ConfigFactory
import scala.language.postfixOps
import scala.concurrent.duration._
object YourApplication extends App {
val config = ConfigFactory.parseString(
"""
|akka{
| actor {
| provider = remote
| }
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| untrusted-mode = off
| netty.tcp {
| hostname="127.0.0.1"
| port=19000
| }
| }
|}
""".stripMargin
)
val actorSystem = ActorSystem("your-actor-system", config)
import actorSystem.dispatcher
val logger = actorSystem.log
implicit val implicitActorSystem = actorSystem
implicit val actorMaterializer = ActorMaterializer()
val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@127.0.0.1:18000/user/my-actor")
val myActorSelection = actorSystem.actorSelection(myActorPath)
val source = Source(1 to 10)
// here this "mapAsync" wraps the given T => Future[T] function in a GraphStage
val myRemoteComponent = Flow[Int].mapAsync(2)(i => {
myActorSelection.resolveOne(1 seconds).flatMap(myActorRef =>
(myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int])
)
})
val sink = Sink.foreach[Int](i => logger.info(i.toString))
val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right)
val streamRun = stream.run()
}
关于scala - 在涉及多个 JVM 的 Akka Streams 中维持背压的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41719483/