scala - 流会被终止吗?

标签 scala akka akka-typed

我有 Actor ,如下所示:

enter image description here

正如您在图像中看到的,ActorStreamActor 的子级。 问题是,当我终止Actor时,ActorStream也会终止吗?

这是我如何在 Actor 中创建 ActorStream 的方法:

 def create(fsm: ActorRef[ServerHealth], cancel: Option[Cancellable]): Behavior[ServerHealthStreamer] =
    Behaviors.setup { context =>
      implicit val system = context.system
      implicit val materializer = ActorMaterializer()
      implicit val dispatcher = materializer.executionContext

      val kafkaServer = system
        .settings
        .config
        .getConfig("kafka")
        .getString("servers")

      val sink: Sink[ServerHealth, NotUsed] = ActorSink.actorRefWithAck[ServerHealth, ServerHealthStreamer, Ack](
        ref = context.self,
        onCompleteMessage = Complete,
        onFailureMessage = Fail.apply,
        messageAdapter = Message.apply,
        onInitMessage = Init.apply,
        ackMessage = Ack)

      val cancel = Source.tick(1.seconds, 15.seconds, NotUsed)
        .flatMapConcat(_ => Source.fromFuture(health(kafkaServer)))
        .map {
          case true =>
            KafkaActive
          case false =>
            KafkaInactive
        }
        .to(sink)
        .run()

      Behaviors.receiveMessage {
        case Init(ackTo) =>
          ackTo ! Ack
          Behaviors.same
        case Message(ackTo, msg) =>
          fsm ! msg
          ackTo ! Ack
          create(fsm, Some(cancel))
        case Complete =>
          Behaviors.same
        case Fail(_) =>
          fsm ! KafkaInactive
          Behaviors.same
      }
    }

最佳答案

在您的情况下, Actor 终止必须终止流,因为在后台舞台 Actor 观看传递的 actorRef 并完成舞台(如果终止到达)

我想你可以在这里找到更多信息 https://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/

An extremely important aspect to understand is that the materialized stream is running as a set of actors on the threads of the execution context on which they were allocated. In other words, the stream is running independently from the actor that allocated it. This becomes very important if the stream is long-running, or even infinite, and we want the actor to manage the life-cycle of the stream, such that when the actor stops, the stream is terminated. Expanding on the example above, I will make the stream infinite and use a KillSwitch to manage the life-cycle of the stream.

关于scala - 流会被终止吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56802459/

相关文章:

scala - 如何在它们自己的执行上下文中而不是在 Actor 系统调度程序上运行 future。 [斯卡拉| Akka ]

scala - 与 pipeTo 等效的 Akka 类型是什么?

scala - 使用 Scala 宏将符号拼接在一起

scala - 有没有更惯用的方法从 Option[IO[Option[A]] 获取 IO[Option[A]] 然后使用序列和映射连接?

xml - 在没有突变的情况下在 Scala 中修改 XML?

scala - 关于 "Too many HashedWheelTimer instances"的 Akka 警告

Scala 枚举 - 如何分配初始值?

scala - InvalidActorNameException - Actor 名称 {name} 不是唯一的

scala - 如何在 Behaviors.receive 中进行递归调用?

scala - 当我有对类型化actor系统的引用时,如何为 AkkaStreams 实例化一个物化器?