scala - 如何使用FileFormat格式的更新输出模式?

标签 scala apache-spark spark-structured-streaming

我正在尝试在更新输出模式下使用 Spark 结构化流写入文件。我发现this StructuredSessionization example只要配置了控制台格式,它就可以正常工作。但如果我将输出模式更改为:

 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("json")
  .option("path", "/work/output/data")
  .option("checkpointLocation", "/work/output/checkpoint")
  .start()

我收到以下错误:

 Exception in thread "main" org.apache.spark.sql.AnalysisException: Data source json does not support Update output mode;
        at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:279)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286)
        at palyground.StructuredStreamingMergeSpans$.main(StructuredStreamingMergeSpans.scala:84)
        at palyground.StructuredStreamingMergeSpans.main(StructuredStreamingMergeSpans.scala)

我可以使用更新模式并使用 FileFormat 将结果表写入文件接收器吗? 在源代码中我发现了一个确保追加模式的模式匹配。

最佳答案

您无法使用 Spark 结构化流以更新模式写入文件。您需要为其编写ForeachWriter。我在这里为每个作家写了简单的内容。您可以根据您的要求进行修改。

val writerForText = new ForeachWriter[Row] {
    var fileWriter: FileWriter = _

    override def process(value: Row): Unit = {
      fileWriter.append(value.toSeq.mkString(","))
    }

    override def close(errorOrNull: Throwable): Unit = {
      fileWriter.close()
    }

    override def open(partitionId: Long, version: Long): Boolean = {
      FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
      fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
      true

    }
  }

val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .foreach(writerForText)
  .start()

关于scala - 如何使用FileFormat格式的更新输出模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49026429/

相关文章:

java - 将 Spark 流数据帧写入 MongoDB

java - 如何从 Kafka 访问记录中的文件路径并从中创建数据集?

scala - 如何在Intellij IDEA中运行Spark示例程序

docker - Spark Standalone + Zeppelin + Docker:如何设置SPARK_HOME

performance - Spark : Tackle performance intensive commands like collect(), groupByKey(), reduceByKey()

scala - 如何在 Scala 中将字符串转换为整数列表?

scala - Spark 结构化流媒体 avro 到 avro 和自定义 Sink

scala - Play Framework 中指定的基础 docker 镜像在哪里?

scala - Scala 中不区分大小写的模式匹配

带有 scala setAdapter InvocationTargetException 的 Android