我有 Actor ,如下所示:
正如您在图像中看到的,ActorStream
是 Actor
的子级。
问题是,当我终止Actor
时,ActorStream
也会终止吗?
这是我如何在 Actor
中创建 ActorStream
的方法:
def create(fsm: ActorRef[ServerHealth], cancel: Option[Cancellable]): Behavior[ServerHealthStreamer] =
Behaviors.setup { context =>
implicit val system = context.system
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
val kafkaServer = system
.settings
.config
.getConfig("kafka")
.getString("servers")
val sink: Sink[ServerHealth, NotUsed] = ActorSink.actorRefWithAck[ServerHealth, ServerHealthStreamer, Ack](
ref = context.self,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val cancel = Source.tick(1.seconds, 15.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(kafkaServer)))
.map {
case true =>
KafkaActive
case false =>
KafkaInactive
}
.to(sink)
.run()
Behaviors.receiveMessage {
case Init(ackTo) =>
ackTo ! Ack
Behaviors.same
case Message(ackTo, msg) =>
fsm ! msg
ackTo ! Ack
create(fsm, Some(cancel))
case Complete =>
Behaviors.same
case Fail(_) =>
fsm ! KafkaInactive
Behaviors.same
}
}
最佳答案
在您的情况下, Actor 终止必须终止流,因为在后台舞台 Actor 观看传递的 actorRef 并完成舞台(如果终止到达)
我想你可以在这里找到更多信息 https://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/
An extremely important aspect to understand is that the materialized stream is running as a set of actors on the threads of the execution context on which they were allocated. In other words, the stream is running independently from the actor that allocated it. This becomes very important if the stream is long-running, or even infinite, and we want the actor to manage the life-cycle of the stream, such that when the actor stops, the stream is terminated. Expanding on the example above, I will make the stream infinite and use a KillSwitch to manage the life-cycle of the stream.
关于scala - 流会被终止吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56802459/