scala - 将元素放入流中并返回一个对象

标签 scala akka akka-stream

在akka中,我想将元素放入流中并返回一个对象。我知道这些元素可能是运行图表的来源。但是如何在运行时放置元素并返回对象呢?


import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.Array.range
import scala.util.Success

object StreamElement {

  implicit val system = ActorSystem("StreamElement")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  def main(args: Array[String]): Unit = {
    val (queue, value) = Source
    .queue[Int](10, OverflowStrategy.backpressure)
    .map(x => {
      x * x
    })
    .toMat(Sink.asPublisher(false))(Keep.both)
    .run()

    range(0, 10)
      .map(x => {
        queue.offer(x).onComplete {
          case Success(Enqueued) => {
          }

          case Success(Dropped) => {}
          case _ => {
             println("others")
          }
        }
      })
    }
}

如何获取返回值?

最佳答案

实际上,您想要返回每个元素的 int 值。 因此,您可以创建流,然后每次连接到源和接收器。


package tech.parasol.scala.akka

import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.Array.range
import scala.util.Success

object StreamElement {

  implicit val system = ActorSystem("StreamElement")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val flow = Flow[Int]
    .buffer(16, OverflowStrategy.backpressure)
    .map(x => x * x)

  def main(args: Array[String]): Unit = {
    range(0, 10)
      .map(x => {
        Source.single(x).via(flow).runWith(Sink.head)
      }.map( v => println("v ===> " + v)
      ))
  }

}


关于scala - 将元素放入流中并返回一个对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60104206/

相关文章:

Scala:隐式参数解析优先级

java - 警告 TaskSetManager : Lost Task xxx: java. lang.ArrayIndexOutOfBoundsException: 1 - Scala

java - akka actors 工具包 - context.actorOf 与 system.actorOf

scala - 使用 Akka Actors 处理多个 TCP 连接

scala - groupBy 的子流可以依赖于它们生成的键吗?

scala - 如何使用 Akka Persistence 保存流式数据

scala - 创建 Spring Data JPA 存储库的 Scala 方法

scala - 如何获取 Scala 中的当前(工作)目录?

scala - Actor 可以将对象返回给等待请求的 future 吗?

scala - 将 TCP 服务器的输出捕获到 Akka Stream 队列的最推荐方法是什么?