scala - 访问由 Source.actorRef 创建的 akka 流源的底层 ActorRef

标签 scala akka akka-stream

我正在尝试使用 Source.actorRef创建 akka.stream.scaladsl.Source 的方法目的。某种形式

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

我的问题是:如何将数据发送到基于 ActorRef 的源对象 ?

我认为向源发送消息是某种形式
//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

但是weatherSource没有 !运算符(operator)或 tell方法。

documentation关于如何使用 Source.actorRef 并没有太多描述,它只是说你可以......

预先感谢您的评论和回复。

最佳答案

您需要一个 Flow :

  import akka.stream.OverflowStrategy.fail
  import akka.stream.scaladsl.Source
  import akka.stream.scaladsl.{Sink, Flow}

  case class Weather(zip : String, temp : Double, raining : Boolean)

  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

  val sunnySource = weatherSource.filter(!_.raining)

  val ref = Flow[Weather]
    .to(Sink.ignore)
    .runWith(sunnySource)

  ref ! Weather("02139", 32.0, true)

请记住,这都是实验性的,可能会改变!

关于scala - 访问由 Source.actorRef 创建的 akka 流源的底层 ActorRef,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30785011/

相关文章:

scala - 在持久化 RDD 上有多个操作的情况下,缓存 RDD 的工作原理

java - 使用 Akka http 处理 HttpResponse 404 上的重试逻辑

java - scala:为什么 1/0 是算术异常但 1.0/0.0 = Double.Infinity

scala - 在 akka actor 中使用 future 回调

scala - Akka 流(Scala): Filtering out exceptions

scala - Spark session 返回错误 : Apache NiFi

java - Play Framework适合异步后台处理吗?

scala - Akka Actors 中的 Quartz CronTriggers 使用还是不使用 Camel?

scala - Akka Stream TLS 服务器日志记录和故障排除

debugging - 如何调试akka Streams流量?