scala - 如何处理spark中的多行?

标签 scala apache-spark

我有一个包含一些多行观察的数据框:

+--------------------+----------------+
|         col1|               col2|
+--------------------+----------------+
|something1           |somethingelse1  |
|something2           |somethingelse2  |
|something3           |somethingelse3  |
|something4           |somethingelse4  |
|multiline

 row               |     somethings|
|something            |somethingall    |

我想要的是将此数据框保存为 csv 格式(或 txt)。使用以下内容:

df
 .write
 .format("csv")
 .save("s3://../adf/")

但是当我检查文件时,它将观察结果分成多行。我想要的是具有“多行”观测值的行成为 txt/csv 文件中的同一行。我尝试将其另存为txt文件:

df
.as[(String,String)]
.rdd
.saveAsTextFile("s3://../adf")

但观察到相同的输出。

我可以想象一种方法是将 \n 替换为其他内容,然后在加载回来后执行相反的功能。但是有没有办法以所需的方式保存它而不对数据进行任何类型的转换?

最佳答案

假设多行数据被正确引用,您可以使用 univocity 解析器和 multiLine 设置解析多行 csv 数据

sparkSession.read
  .option("parserLib", "univocity")
  .option("multiLine", "true")
  .csv(file)

请注意,这需要将整个文件读取到单个执行器上,如果您的数据太大,则可能无法工作。标准文本文件读取将在执行任何其他解析之前按行拆分文件,这将阻止您处理包含换行符的数据记录,除非您可以使用不同的记录分隔符。如果没有,您可能需要实现自定义 TextInputFormat 来处理多行记录。

关于scala - 如何处理spark中的多行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46408558/

相关文章:

scala - 为什么这个公共(public)字段有一个 PRIVATE 标志?

Scala 闭包文件名

scala - 使用 Lift 更改 <title>

java - 当我尝试在 ubuntu 上启动 Scala 时抛出异常

apache-spark - updateStateByKey函数返回的DStream是否只包含一个RDD?

apache-spark - 如何在2个Spark上下文之间共享Spark RDD?

python - 如何将spark数据输出到具有单独列的csv文件?

scala - 如何安装旧版本的sbt?

scala - 为什么收集数据集会因 org.apache.spark.shuffle.FetchFailedException 而失败?

apache-spark - 使用 Dispatcher 的 Spark Mesos 集群模式