我正在尝试使用 pyspark databricks 中的自动加载器设置 readStream:
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("inferSchema", True) \
.option("cloudFiles.schemaLocation", schema_path) \
.option("cloudFiles.schemaHints", "col1 string, col2 timestamp, col3 timestamp, col4 timestamp, col5 timestamp, col6 int, col7 MAP<STRING,STRING>, col8 MAP<STRING,STRING>, col9 MAP<STRING,STRING>, col10 MAP<STRING,STRING>, col11 MAP<STRING,STRING>, col12 MAP<STRING,STRING>, col13 MAP<STRING,STRING>") \
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load(raw_path_df) \
.writeStream \
.option("checkpointLocation", checkpoint_path) \
.trigger(once=True)\
.toTable(bronze_tbl)
但是,我不断收到 java.lang.Exception: Unsupported type: map<string,string>
不知道为什么会发生这种情况?我之前曾无数次使用自动加载器读取数据,并使用 Map() 类型作为模式提示。不确定我在这里缺少什么。
一旦我删除架构提示参数,上面的 readStream 中的代码就会起作用。
最佳答案
发生这种情况是因为 CSV 根据定义不支持复杂类型 - 仅支持字符串、数字……否则应使用什么类型的数据表示形式? JSON 编码,还是自定义的?
如果您将数据编码为 JSON,则只需将 from_json
应用到相应的列即可。
关于pyspark - databricks 自动加载器使用 MAP() 类型作为模式提示,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75826142/