scala - 使用 foreach 行在数据框中捕获和写入字符串

标签 scala apache-spark hadoop apache-spark-sql

在使用 Scala 替换从数据帧的每一行的特定字段中获取的内容后, try catch 并写入字符串值。但是由于它部署在集群上无法捕获任何记录。谁能提供解决方案?

假设 TEST_DB.finalresult 有 2 个字段 input1 和 input2:

val finalresult=spark.sql("select * from TEST_DB.finalresult")

finalResult.foreach { row => 
    val param1=row.getAs("input1").asInstanceOf[String]
    val param2=row.getAs("input2").asInstanceOf[String]

    val string = """new values of param1 and param2 are -> """ + param1 + """,""" + param2
    // how to append modified string to csv file continously for each microbatch in hdfs ??
}

最佳答案

在您的代码中,您创建了所需的 string 变量,但它没有保存在任何地方,因此您看不到结果。

您可能会在每次执行 foreach 时打开所需的 csv 文件并附加新字符串,但我想提出一个不同的解决方案。

如果可以,请尝试始终使用 Spark 的内置功能,因为它(通常)在处理空输入方面更加优化和更好。您可以通过以下方式实现相同的目的:

import org.apache.spark.sql.functions.{lit, concat, col}

val modifiedFinalResult = finalResult.select(
 concat(
  lit("new values of param1 and param2 are -> "),
  col("input1"),
  lit(","),
  col("input2")
 ).alias("string")
)

在变量 modifiedFinalResult 中,您将拥有一个包含名为 string 的单列的 spark 数据框,它表示与您的变量 string 完全相同的输出你的代码。之后,您可以将数据帧直接保存为单个 csv 文件(使用重新分区功能):

modifiedFinalResult.repartition(1).write.format("csv").save("path/to/your/csv/output")

PS:也是对 future 的一个建议,尽量避免以数据类型命名变量。

UPDATE: Fixed the empty rows issue by using "concat_ws" instead of concat and coalesce to each fields. It seems some of the values which were null were transforming the entire concatenated string to null after the transformation. Nevertheless this solution works for now!

关于scala - 使用 foreach 行在数据框中捕获和写入字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56365224/

相关文章:

hadoop - 从另一个目录启动时 Hive 不显示表

scala - 为变量分配下划线。下划线在做什么?

scala - Scala中的DSL是什么?

mysql - 尝试添加 MySQL/slick 后出现奇怪的 Play Framework 2.2 异常

apache-spark - 根据另一列的不同值对列进行计数 pyspark

hadoop - 在配置单元表的顶部添加一些行

java - 用于编译 GUI 表单的 Ant 任务(Intellij IDEA)

json - Apache Spark : Convert column with a JSON String to new Dataframe in Scala spark

apache-spark - 使用 pyspark 将列名称动态传递到列列表中每列的检查条件是否为空条件

CentOS上的Hadoop伪分布式配置