scala - Akka流:状态不断

标签 scala stream akka akka-stream stateful

我想使用Akka Streams读取多个大文件来处理每一行。想象一下,每个键都由一个(identifier -> value)组成。如果找到了新的标识符,我想将其及其值保存在数据库中;否则,如果在处理行流时已经找到标识符,那么我只想保存该值。为此,我认为我需要某种递归的有状态流,以保留已在Map中找到的标识符。我想我会在此流程中收到一对(newLine, contextWithIdentifiers)

我刚刚开始研究Akka Streams。我想我可以管理自己做无状态处理工作,但是我不知道如何保留contextWithIdentifiers。我将不胜感激任何指向正确方向的指示。

最佳答案

也许像statefulMapConcat这样的东西可以帮助您:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))

val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
  //state with already processed ids
  var ids = Set.empty[Int]
  identValue => if (ids.contains(identValue.id)) {
    //save value to DB
    println(identValue.value)
    List(identValue)
  } else {
    //save both to database
    println(identValue)
    ids = ids + identValue.id
    List(identValue)
  }
}

Source(identValues)
  .via(stateFlow)
  .runWith(Sink.seq)
  .onSuccess { case identValue => println(identValue) }

关于scala - Akka流:状态不断,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37902354/

相关文章:

scala - 如何解决在逆变位置使用协变类型参数的问题

scala - Scala 中的非阻塞和阻塞 future 是什么?

stream - 如何在 Flink 中使用 ListState 进行 BroadcastProcessFunction

ios - AVPlayer seekTo 不准确

java - 如何从文件系统中获取文件属性流?

scala - Play 2.0 + Bootstrap3 : Showing active navigation item

scala - Akka:在Actor中一次处理一条消息的原因是什么?

c++ - 从输入流中读取具有跳过 block 能力的行

Scala Actor : doesn't receive messages

scala - 累积并排序 child Actor 的响应?