嗨,我正在编写一个 DATABRICKS Python 代码,它选择巨大的 JSON 文件并将其分为两部分。这意味着从一个文件的索引 0 或“reporting_entity_name”到索引 3 或“version”,以及从另一个文件的索引 4 到最后。虽然它成功地将文件与 json 文件的索引 1 分开,但是当我提供索引 0 时,它失败并显示
Datasource does not support writing empty or nested empty schemas. Please make sure the data schema has at least one or more column(s).
这是大型 JSON 文件的示例数据。
{
"reporting_entity_name": "launcher",
"reporting_entity_type": "launcher",
"last_updated_on": "2020-08-27",
"version": "1.0.0",
"in_network": [
{
"negotiation_arrangement": "ffs",
"name": "Boosters",
"billing_code_type": "CPT",
"billing_code_type_version": "2020",
"billing_code": "27447",
"description": "Boosters On Demand",
"negotiated_rates": [
{
"provider_groups": [
{
"npi": [
0
],
"tin": {
"type": "ein",
"value": "11-1111111"
}
}
],
"negotiated_prices": [
{
"negotiated_type": "negotiated",
"negotiated_rate": 123.45,
"expiration_date": "2022-01-01",
"billing_class": "organizational"
}
]
}
]
}
]
}
这是Python代码。
from pyspark.sql.functions import explode, col
import itertools
# Read the JSON file from Databricks storage
df_json = spark.read.option("multiline","true").json("/mnt/BigData_JSONFiles/SampleDatafilefrombigfile.json")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
# Convert the dataframe to a dictionary
data = df_json.toPandas().to_dict()
# Split the data into two parts
d1 = dict(itertools.islice(data.items(), 1))
d2 = dict(itertools.islice(data.items(), 1, len(data.items())))
# Convert the first part of the data back to a dataframe
df1 = spark.createDataFrame([d1])
# Write the first part of the data to a JSON file in Databricks storage
df1.write.format("json").save("/mnt/BigData_JSONFiles/SampleDatafilefrombigfile_detail.json")
# Convert the second part of the data back to a dataframe
df2 = spark.createDataFrame([d2])
# Write the second part of the data to a JSON file in Databricks storage
df2.write.format("json").save("/mnt/BigData_JSONFiles/SampleDatafilefrombigfile_header.json")
这是两个文件的输出。在输出文件中,您可以在详细信息文件中看到它应该只包含“in_network”的数据,但它也有 0 索引数据,即“reporting_entity_name”,它不应该在详细文件中,而应该在头文件中。
{
"in_network": [
{
"negotiation_arrangement": "ffs",
"name": "Boosters",
"billing_code_type": "CPT",
"billing_code_type_version": "2020",
"billing_code": "27447",
"description": "Boosters On Demand",
"negotiated_rates": [
{
"provider_groups": [
{
"npi": [
0
],
"tin": {
"type": "ein",
"value": "11-1111111"
}
}
],
"negotiated_prices": [
{
"negotiated_type": "negotiated",
"negotiated_rate": 123.45,
"expiration_date": "2022-01-01",
"billing_class": "organizational"
}
]
}
]
}
]
},"negotiation_arrangement":"ffs"}]}}
头文件的输出,从 1 个索引开始并给出输出。
{"reporting_entity_type": "launcher",
"last_updated_on": "2020-08-27",
"version": "1.0.0"}
请帮助我解决这个错误。
有关代码的指导将会很有帮助。
这是大型 json 文件的屏幕截图,它是上面附加文件的精确副本,我将集群从 2 GB 增加到 8 GB。但文件中 in_network 内的 dict 也出现同样的错误 714 次。但为什么它在大文件中失败了。如果完全一样的话。 我也更改了答案的代码行
df_network=df_json.select(df_json.columns[714:])
这是回溯
AnalysisException Traceback (most recent call last)
<command-863551447189973> in <cell line: 13>()
11 df_version=df_json.select(df_json.columns[:1])
12
---> 13 df_network.write.format("json").save("/mnt/BigData_JSONFiles/2022-10_040_05C0_in-network-rates_2_of_2_detail.json")
14 df_version.write.format("json").save("/mnt/BigData_JSONFiles/2022-10_040_05C0_in-network-rates_2_of_2_header.json")
15 display(df_network)
/databricks/spark/python/pyspark/instrumentation_utils.py in wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature
/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
966 self._jwrite.save()
967 else:
--> 968 self._jwrite.save(path)
969
970 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1323
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
200 # Hide where the exception came from that shows a non-Pythonic
201 # JVM exception message.
--> 202 raise converted from None
203 else:
204 raise
AnalysisException:
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
最佳答案
我已经复制了您的代码,并得到了一个文件的以下结果。
{"last_updated_on":{"0":"2020-08-27"},"reporting_entity_name":{"0":"launcher"},"reporting_entity_type":{"0":"launcher"},"version":{"0":"1.0.0"}}
内部0
键可能是由于使用了字典和pandas。
由于您的 JSON 具有相同的结构,因此您可以尝试以下解决方法来使用 select
来划分 JSON,而不是转换为字典。
这是来自 JSON 文件的原始数据框。
因此,使用 select
生成所需的 JSON 文件。
df_network=df_json.select(df_json.columns[:1])
df_version=df_json.select(df_json.columns[1:])
display(df_network)
display(df_version)
数据框:
写入 JSON 文件后的结果:
关于python - 无法在DataBricks中使用python成功分割JSON文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75233823/