scala - 如何将完全输出模式下的流聚合保存到 Parquet?

标签 scala apache-spark parquet spark-structured-streaming

我已经使用完整模式在流数据帧上应用了聚合。为了在本地保存数据帧,我实现了 foreach下沉。我能够以文本形式保存数据框。但我需要以 Parquet 格式保存它。

val writerForText = new ForeachWriter[Row] {
    var fileWriter: FileWriter = _

    override def process(value: Row): Unit = {
      fileWriter.append(value.toSeq.mkString(","))
    }

    override def close(errorOrNull: Throwable): Unit = {
      fileWriter.close()
    }

    override def open(partitionId: Long, version: Long): Boolean = {
      FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
      fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
      true

    }
  }

val columnName = "col1"
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName))
              .writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start()

我怎样才能做到这一点?
提前致谢!

最佳答案

To save dataframe in local, i have implemented foreach sink. I am able to save dataframe in text form. But i need to save it in parquet format.



保存流式数据集时的默认格式是... Parquet .话虽如此,您不必使用相当先进的 foreach下沉,但仅仅是 parquet .

查询可能如下:
scala> :type in
org.apache.spark.sql.DataFrame

scala> in.isStreaming
res0: Boolean = true

in.writeStream.
  option("checkpointLocation", "/tmp/checkpoint-so").
  start("/tmp/parquets")

关于scala - 如何将完全输出模式下的流聚合保存到 Parquet?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46420576/

相关文章:

java - scala.MatchError : in Dataframes

list - 反向列表 Scala

apache-spark - 如何在 spark-submit 中排除包?

scala - 如何在 Spark SQL 中定义和使用用户定义的聚合函数?

web-services - 如何在 Playframework 2/Scala 中使用 WS API 发送空正文请求?

scala - 如何在 Scala 中单独存储 HashMap 值?

scala - 字符串相同但行为不同 : File strings don't work with spark if they are from an HDFS file

apache-spark - Spark中高效读取嵌套 Parquet 列

apache-kafka - 如何在达到特定大小 (128 Mb) 时将 Kafka 消息提交到 HDFS 接收器

c# - 尝试从 azure blob 存储读取 parquet 数据时出现异常(使用 ChoETL)