scala - inferSchema=true 不适用于读取 csv 文件 n Spark Structured Streaming

标签 scala apache-spark spark-structured-streaming spark-csv

我收到错误消息

java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.

    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:251)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:115)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:115)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
    at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:404)
    at io.sekai.core.streaming.KafkaDataGenerator.readFromCSVFile(KafkaDataGenerator.scala:38)
当我加载 csv 文件时
spark2
  .readStream
  .format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  //.schema(schema)
  .option("delimiter", ",")
  .option("maxFilesPerTrigger", 1)
  .csv(path)
我尝试了另一种格式的选项,例如
spark2
  .readStream
  .format("csv")
  .option("inferSchema", value = true)
  .option("header", value = true)
  //.schema(schema)
  .option("delimiter", ",")
  .option("maxFilesPerTrigger", 1)
  .csv(path)
我想推断架构并注释掉显式架构用法。
csv 文件示例如下:
id,Energy Data,Distance,Humidity,Ambient Temperature,Cold Water Temperature,Vibration Value 1,Vibration Value 2,Handle Movement
1,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
2,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
3,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
4,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
5,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
6,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
7,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
8,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
9,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
10,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
这里有什么问题,因为我严格按照选项说明进行操作,但是推断是如何发生的?

最佳答案

您在这里有 2 个选择:

  • 在运行流查询之前,将数据样本写入目的地一次。当您再次运行流式查询时,将推断架构。
  • 套装spark.sql.streaming.schemaInferencetrue :
  • spark.sql("set spark.sql.streaming.schemaInference=true")
    
    来自 docs :

    By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

    关于scala - inferSchema=true 不适用于读取 csv 文件 n Spark Structured Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69608099/

    相关文章:

    scala - child Actor 终止验证

    java - 为什么将 SparkSql 与 Hive 一起使用时找不到此表错误?

    apache-spark - Spark Streaming 中的 groupby 理想策略

    apache-spark - 将转换应用于多列 pyspark 数据框

    apache-spark - Spark结构化流的内存问题

    java - Spark : get groupBy output as list of rows

    apache-spark - Spark 结构化流 - 限制? (源性能、不支持的操作、Spark UI)

    scala - 相当于 F# 中的 Scala "case class"

    java - Scala 可以独立运行(无需先安装 Java)吗?安装哪个 Java 有关系吗?

    scala - 在 Spark Streaming 中,有没有办法检测批处理何时完成?