akka - 在 Akka Streams 中使用动态接收器目标

标签 akka akka-stream

我对 Akka 比较陌生,正在尝试学习基础知识。我的用例是不断从 JMS 队列读取消息并将每条消息输出到一个新文件。我有基本的设置:

Source<String, NotUsed> jmsSource =
  JmsSource
    .textSource(JmsSourceSettings
    .create(connectionFactory)
    .withQueue("myQueue")
    .withBufferSize(10));

Sink<ByteString, CompletionStage<IOResult>> fileSink =
  FileIO.toFile(new File("random.txt"));

final Flow<String, ByteString, NotUsed> flow = Flow.fromFunction((String n) -> ByteString.fromString(n));

final RunnableGraph<NotUsed> runnable = jmsSource.via(flow).to(fileSink);

runnable.run(materializer);

但是,我希望文件名是动态的(而不是硬编码为“random.txt”):它应该根据队列中每条消息的内容进行更改。当然,我可以在流程中选择文件名,但是如何在 fileSink 中设置该名称?我如何最好地设置它?

最佳答案

我基于 akka.stream.impl.LazySink 创建了一个简单的 Sink .我只在成功案例中使用单个元素对其进行了测试,因此请随时在此处或 GitHub Gist 发表评论。 .

import akka.NotUsed
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage._

class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] {

  val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in")
  override val shape = SinkShape(in)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    override def preStart(): Unit = pull(in)

    val awaitingElementHandler = new InHandler {
      override def onPush(): Unit = {
        val element = grab(in)
        val innerSource = createInnerSource(element)
        val innerSink = sink(element)
        Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer)
      }

      override def onUpstreamFinish(): Unit = completeStage()

      override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
    }
    setHandler(in, awaitingElementHandler)

    def createInnerSource(element: T): SubSourceOutlet[T] = {
      val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource")

      innerSource.setHandler(new OutHandler {
        override def onPull(): Unit = {
          innerSource.push(element)
          innerSource.complete()
          if (isClosed(in)) {
            completeStage()
          } else {
            pull(in)
            setHandler(in, awaitingElementHandler)
          }
        }

        override def onDownstreamFinish(): Unit = {
          innerSource.complete()
          if (isClosed(in)) {
            completeStage()
          }
        }
      })

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val illegalStateException = new IllegalStateException("Got a push that we weren't expecting")
          innerSource.fail(illegalStateException)
          failStage(illegalStateException)
        }

        override def onUpstreamFinish(): Unit = {
          // We don't stop until the inner stream stops.
          setKeepGoing(true)
        }

        override def onUpstreamFailure(ex: Throwable): Unit = {
          innerSource.fail(ex)
          failStage(ex)
        }
      })

      innerSource
    }
  }
}

object OneToOneOnDemandSink {

  def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink))
}

这将为每个元素创建一个新的 Sink,因此它避免了 LazySink 的大量复杂性。并且也没有合理的物化值(value)可以返回。

关于akka - 在 Akka Streams 中使用动态接收器目标,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45192072/

相关文章:

java - 分析线程转储 - sun.misc.Unsafe.park 上有很多被阻塞的线程

java - 在 AKKA 中,调用 supervisor 的 shutdown 会停止它所监督的所有 actor 吗?

java - Akka 和 Ask 模式。当 Actor 突然停止时我可以返回 Future 吗?

scala - 为什么 Akka Streams 会吞下我的异常?

java - 动态改变 Akka Streams 的节流级别

performance - 在 Akka 中创建 Actor 的成本是多少?

java - Akka Java 文件 IO 限制

scala - Akka流:状态不断

scala - Akka 流卡夫卡: No configuration setting found for key 'kafka-clients'