scala - 按分隔符分割 Spark 流

标签 scala apache-spark spark-streaming

我正在尝试根据分隔符分割 Spark 流,并将每个 block 保存到一个新文件中。

我的每个 RDD 似乎都是根据分隔符进行分区的。

我很难为每个 RDD 配置一条分隔符消息,或者无法将每个分区单独保存到新的 part-000... 文件。

任何帮助将不胜感激。谢谢

 val sparkConf = new SparkConf().setAppName("DataSink").setMaster("local[8]").set("spark.files.overwrite","false")
 val ssc = new StreamingContext(sparkConf, Seconds(2))

 class RouteConsumer extends Actor with ActorHelper with Consumer {
    def endpointUri = "rabbitmq://server:5672/myexc?declare=false&queue=in_hl7_q"
    def receive = {
        case msg: CamelMessage =>
           val m = msg.withBodyAs[String]
           store(m.body)
     }
 }

 val dstream = ssc.actorStream[String](Props(new RouteConsumer()), "SparkReceiverActor")
 val splitStream = dstream.flatMap(_.split("MSH|^~\\&"))
 splitStream.foreachRDD( rdd => rdd.saveAsTextFile("file:///home/user/spark/data") )

 ssc.start()
 ssc.awaitTermination()

最佳答案

您无法控制哪个part-NNNNN (分区)文件获取哪个输出,但您可以写入不同的目录。进行这种列分割的“最简单”方法是使用单独的映射语句(例如 SELECT 语句),类似这样,假设您有 n分割后的数组元素:

... val dstream2 = dstream.map(_.split("...")) // like above, but with map dstream2.cache() // very important for what follows, repeated reads of this... val dstreams = new Array[DStream[String]](n) for (i <- 0 to n-1) { dstreams[i] = dstream2.map(array => array[i] /* or similar */) dstreams[i].saveAsTextFiles(rootDir+"/"+i) } ssc.start() ssc.awaitTermination()

关于scala - 按分隔符分割 Spark 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31568134/

相关文章:

'should contain' 的 Scalatest 自定义匹配器

scala - 将 StructType 定义为函数 Spark-Scala 2.11 的输入数据类型

java - 如何通过 java 代码将 java HashMap 转换为不可变的 Scala 映射?

hadoop - 为什么此示例导致NaN?

python - PySpark Streaming 示例似乎没有终止

Sonar 中的 Scala 插件

hadoop - Dataproc 上的 "processing node"是什么?

apache-spark - Apache Spark 中的“哪里”

scala - 无法在已停止的 SparkContext 上调用方法

apache-spark - Spark Streaming 加入 GreenPlum/Postgres 数据。方法