scala - 如何使用 Akka Streams 在分隔符上拆分入站流

标签 scala akka akka-stream

我一直在玩一些实验性的 Akka Streams API,我有一个用例,我想看看如何实现。对于我的用例,我有一个 StreamTcp基于 Flow这是通过将连接的输入流绑定(bind)到我的服务器套接字来提供的。我拥有的流程基于 ByteString数据进入它。传入的数据将有一个分隔符,这意味着我应该将分隔符之前的所有内容视为一条消息,并将下一个分隔符之后的所有内容视为下一条消息。所以玩一个更简单的例子,不使用套接字,只使用静态文本,这就是我想出的:

import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString

object BasicTransformation {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")

    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

    Flow(data).
      splitWhen(c => c == '.').
      foreach{producer => 
        Flow(producer).
          filter(c => c != '.').
          fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
          map(_.toString).
          filter(!_.isEmpty).
          foreach(println(_)).
          consume(FlowMaterializer(MaterializerSettings()))
      }.
      onComplete(FlowMaterializer(MaterializerSettings())) {
        case any =>
          system.shutdown
      }
  }
}
Flow上的主要功能我发现实现我的目标是splitWhen ,然后产生额外的子流,每条消息一个 .分隔符。然后,我使用另一个步骤管道处理每个子流,最后在最后打印各个消息。

这一切似乎有点冗长,以完成我认为非常简单和常见的用例。所以我的问题是,是否有一种更简洁、更简洁的方法来执行此操作,或者这是通过分隔符拆分流的正确和首选方法?

最佳答案

我认为安德烈使用 Framing是您问题的最佳解决方案,但我遇到了类似的问题,发现 Framing太有限了。我用了statefulMapConcat相反,它允许您使用您喜欢的任何规则对输入的 ByteString 进行分组。这是您的问题的代码,以防它帮助任何人:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

object BasicTransformation extends App {

  implicit val system = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()
  implicit val dispatcher = system.dispatcher
  val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

  val grouping = Flow[Byte].statefulMapConcat { () =>
    var bytes = ByteString()
    byt =>
      if (byt == '.') {
        val string = bytes.utf8String
        bytes = ByteString()
        List(string)
      } else {
        bytes :+= byt
        Nil
      }
  }

  Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate())
}

产生: Lorem Ipsum is simply Dummy text of the printing And typesetting industry

关于scala - 如何使用 Akka Streams 在分隔符上拆分入站流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25631099/

相关文章:

scala - 如何使用 Reads 对 JsValue 进行模式匹配

reflection - Scala:从字段名称反射地设置字段值

java - 如何在 Akka 中升级最顶级的主管?

scala - Akka http:Akka流与 Actor 建立休息服务

scala - 依靠特征中的案例类方法

Scala 类的多重继承

scala - 在 Akka Typed persistent actor 中创建子 actor

scala - 为什么会出现Conflicting cross-version suffixes的错误?

akka - 在 Akka-Streams 中拆分流

scala - Akka Streams 按类型拆分流