pyspark - databricks 自动加载器使用 MAP() 类型作为模式提示

标签 pyspark databricks databricks-autoloader

我正在尝试使用 pyspark databricks 中的自动加载器设置 readStream:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("inferSchema", True) \
  .option("cloudFiles.schemaLocation", schema_path) \
  .option("cloudFiles.schemaHints", "col1 string, col2 timestamp, col3 timestamp, col4 timestamp, col5 timestamp, col6 int, col7 MAP<STRING,STRING>, col8 MAP<STRING,STRING>, col9 MAP<STRING,STRING>, col10 MAP<STRING,STRING>, col11 MAP<STRING,STRING>, col12 MAP<STRING,STRING>, col13 MAP<STRING,STRING>") \
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load(raw_path_df) \
  .writeStream \
  .option("checkpointLocation", checkpoint_path) \
  .trigger(once=True)\
  .toTable(bronze_tbl)

但是,我不断收到 java.lang.Exception: Unsupported type: map<string,string>

不知道为什么会发生这种情况?我之前曾无数次使用自动加载器读取数据,并使用 Map() 类型作为模式提示。不确定我在这里缺少什么。

一旦我删除架构提示参数,上面的 readStream 中的代码就会起作用。

最佳答案

发生这种情况是因为 CSV 根据定义不支持复杂类型 - 仅支持字符串、数字……否则应使用什么类型的数据表示形式? JSON 编码,还是自定义的?

如果您将数据编码为 JSON,则只需将 from_json 应用到相应的列即可。

关于pyspark - databricks 自动加载器使用 MAP() 类型作为模式提示,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75826142/

相关文章:

apache-spark - 在 PySpark 的 DataFrame 中聚合列数组?

apache-spark - 如何在 Spark Structured Streaming 中指定批处理间隔?

apache-spark - Databricks DBT 运行时错误,无法连接到数据库。也许是 SSL 错误?

python - 在 databricks 上使用 dataframe 对象的 write 方法时出错

apache-spark - 将 "|"分隔的标题和列数据转换为 pyspark 中具有特定标题名称和数据行的单独列

apache-spark - PySpark 删除所有特殊字符的所有列名中的特殊字符

r - 将远程 R 包安装到 Databricks 集群而不是笔记本

apache-spark - 处理 Databricks 自动加载器中的重复项

azure - 为 Autoloader 选择什么样的节点 - Azure

python - 使用 csv 的 databricks 自动加载器时如何处理列名称中的无效字符?