scala - 如何在 Actor 中停止 Source.tick?

标签 scala akka akka-stream

我有一个 Actor ,每 2 秒产生一次 NotUsed .也许它没有任何意义,但它只是为了测试目的。

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, Props}
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.sweetsoft.FsmSystem.{Add, StartTicker, StopTicker}

import scala.concurrent.duration._

object AddActor {

  def props: Props = Props(new AddActor)

}

final class AddActor extends Actor with ActorLogging {

  implicit val materilizer = ActorMaterializer()

  private val consumer: NotUsed => Unit = _ =>
    context.parent ! Add

  private val runnable = Source.tick(2.second, 2.second, NotUsed)
    .named("Ticker")
    .toMat(Sink.foreach(consumer))(Keep.both)

  override def receive: Receive = {
    case StartTicker =>

      runnable.run()
    case StopTicker =>

  }
}  

当Actor收到消息StopTicker ,然后我想停止流。通过 StartTicker ,然后启动流。

调用run()方法,我会得到物化值Cancellabel但无法在StopTicker 内访问范围。

我该怎么办?

最佳答案

您可以使用 becode/unbecome图案:

final class AddActor extends Actor with ActorLogging {
    import context._

    implicit val materilizer: ActorMaterializer = ActorMaterializer()

    private val consumer: NotUsed => Unit = _ =>
      self ! Add //send Add to self rather than directly to context.parent

    private val runnable = Source.tick(2.second, 2.second, NotUsed)
      .named("Ticker")
      .toMat(Sink.foreach(consumer))(Keep.both)

    override def preStart(): Unit = { //start runnable on actor start
      super.preStart()
      runnable.run()
    }

    val running: Receive = { //when running ignore new StartTicker but handle StopTicker and Add
      case StopTicker =>
        become(paused)

      case Add =>
        context.parent ! Add
    }

    val paused: Receive = { //handle just StartTicker
      case StartTicker =>
        become(running)
    }

    override def receive: Receive = paused //initial receive is paused

}

关于scala - 如何在 Actor 中停止 Source.tick?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56093224/

相关文章:

scala - 密封特征/对象案例类字节代码从 2.9.1 更改。到 2.9.2?

java - AKKA:Actor 路由器消息持久化

scala - Akka Stream + Akka Http - 获取错误请求

scala - 使用 Akka 进行 fork 和 join

scala - 启用 REPL 高级用户模式 ​​( :power) from script

scala - 动态创建和设置场景

xml - 使用拉式解析器的 Scala 内存泄漏

scala - 数据流的 SHA256

java - Akka 。如何使用 Source.actorRef 完成流?

scala - 通过 Akka HTTP/Akka Streams 转发(下载/上传)大文件