python - 无法在DataBricks中使用python成功分割JSON文件

标签 python azure pyspark databricks

嗨,我正在编写一个 DATABRICKS Python 代码,它选择巨大的 JSON 文件并将其分为两部分。这意味着从一个文件的索引 0 或“reporting_entity_name”到索引 3 或“version”,以及从另一个文件的索引 4 到最后。虽然它成功地将文件与 json 文件的索引 1 分开,但是当我提供索引 0 时,它失败并显示

Datasource does not support writing empty or nested empty schemas. Please make sure the data schema has at least one or more column(s).

这是大型 JSON 文件的示例数据。

{
  "reporting_entity_name": "launcher",
  "reporting_entity_type": "launcher",
  "last_updated_on": "2020-08-27",
  "version": "1.0.0",
  "in_network": [
    {
      "negotiation_arrangement": "ffs",
      "name": "Boosters",
      "billing_code_type": "CPT",
      "billing_code_type_version": "2020",
      "billing_code": "27447",
      "description": "Boosters On Demand",
      "negotiated_rates": [
        {
          "provider_groups": [
            {
              "npi": [
                0
              ],
              "tin": {
                "type": "ein",
                "value": "11-1111111"
              }
            }
          ],
          "negotiated_prices": [
            {
              "negotiated_type": "negotiated",
              "negotiated_rate": 123.45,
              "expiration_date": "2022-01-01",
              "billing_class": "organizational"
            }
          ]
        }
      ]
    }
  ]
}

这是Python代码。

from pyspark.sql.functions import explode, col
import itertools

# Read the JSON file from Databricks storage
df_json = spark.read.option("multiline","true").json("/mnt/BigData_JSONFiles/SampleDatafilefrombigfile.json")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

# Convert the dataframe to a dictionary
data = df_json.toPandas().to_dict()

# Split the data into two parts
d1 = dict(itertools.islice(data.items(), 1))
d2 = dict(itertools.islice(data.items(), 1, len(data.items())))

# Convert the first part of the data back to a dataframe
df1 = spark.createDataFrame([d1])

# Write the first part of the data to a JSON file in Databricks storage
df1.write.format("json").save("/mnt/BigData_JSONFiles/SampleDatafilefrombigfile_detail.json")

# Convert the second part of the data back to a dataframe
df2 = spark.createDataFrame([d2])

# Write the second part of the data to a JSON file in Databricks storage
df2.write.format("json").save("/mnt/BigData_JSONFiles/SampleDatafilefrombigfile_header.json")

这是两个文件的输出。在输出文件中,您可以在详细信息文件中看到它应该只包含“in_network”的数据,但它也有 0 索引数据,即“reporting_entity_name”,它不应该在详细文件中,而应该在头文件中。

{
"in_network": [
    {
      "negotiation_arrangement": "ffs",
      "name": "Boosters",
      "billing_code_type": "CPT",
      "billing_code_type_version": "2020",
      "billing_code": "27447",
      "description": "Boosters On Demand",
      "negotiated_rates": [
        {
          "provider_groups": [
            {
              "npi": [
                0
              ],
              "tin": {
                "type": "ein",
                "value": "11-1111111"
              }
            }
          ],
          "negotiated_prices": [
            {
              "negotiated_type": "negotiated",
              "negotiated_rate": 123.45,
              "expiration_date": "2022-01-01",
              "billing_class": "organizational"
            }
          ]
        }
      ]
    }
  ]
},"negotiation_arrangement":"ffs"}]}}

头文件的输出,从 1 个索引开始并给出输出。

{"reporting_entity_type": "launcher",
  "last_updated_on": "2020-08-27",
  "version": "1.0.0"}

请帮助我解决这个错误。

有关代码的指导将会很有帮助。

这是大型 json 文件的屏幕截图,它是上面附加文件的精确副本,我将集群从 2 GB 增加到 8 GB。但文件中 in_network 内的 dict 也出现同样的错误 714 次。但为什么它在大文件中失败了。如果完全一样的话。 我也更改了答案的代码行

df_network=df_json.select(df_json.columns[714:])

enter image description here

这是回溯

AnalysisException                         Traceback (most recent call last)
<command-863551447189973> in <cell line: 13>()
     11 df_version=df_json.select(df_json.columns[:1])
     12 
---> 13 df_network.write.format("json").save("/mnt/BigData_JSONFiles/2022-10_040_05C0_in-network-rates_2_of_2_detail.json")
     14 df_version.write.format("json").save("/mnt/BigData_JSONFiles/2022-10_040_05C0_in-network-rates_2_of_2_header.json")
     15 display(df_network)

/databricks/spark/python/pyspark/instrumentation_utils.py in wrapper(*args, **kwargs)
     46             start = time.perf_counter()
     47             try:
---> 48                 res = func(*args, **kwargs)
     49                 logger.log_success(
     50                     module_name, class_name, function_name, time.perf_counter() - start, signature

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    966             self._jwrite.save()
    967         else:
--> 968             self._jwrite.save(path)
    969 
    970     @since(1.4)

/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    200                 # Hide where the exception came from that shows a non-Pythonic
    201                 # JVM exception message.
--> 202                 raise converted from None
    203             else:
    204                 raise

AnalysisException: 
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).

最佳答案

我已经复制了您的代码,并得到了一个文件的以下结果。

{"last_updated_on":{"0":"2020-08-27"},"reporting_entity_name":{"0":"launcher"},"reporting_entity_type":{"0":"launcher"},"version":{"0":"1.0.0"}}

enter image description here

内部0键可能是由于使用了字典和pandas。

由于您的 JSON 具有相同的结构,因此您可以尝试以下解决方法来使用 select 来划分 JSON,而不是转换为字典。

这是来自 JSON 文件的原始数据框。

enter image description here

因此,使用 select 生成所需的 JSON 文件。

df_network=df_json.select(df_json.columns[:1])
df_version=df_json.select(df_json.columns[1:])
display(df_network)
display(df_version)

数据框:

enter image description here

写入 JSON 文件后的结果:

enter image description here

enter image description here

关于python - 无法在DataBricks中使用python成功分割JSON文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75233823/

相关文章:

python - 如何阻止 Google Cloud SQL 中的 SQL 注入(inject)?

python - 如何使用来自不同数据帧的条件将列添加到 pyspark 数据帧

python - 使用 MS C 与 MinGW 编译 Python 的速度差异

python - 使用非模型表单的隐藏表单字段

python - Pandas :计算数据框中的唯一值

azure - Azure 应用服务中部署的 Spring Boot Web 应用中的 HTTPS

c# - 如何在 Blazor 服务器应用程序中正确计算 SignalR 消息?

azure - 如何在azure模拟器中运行webjobs

python - 如何在 pyspark 中设置 spark.sql.parquet.output.committer.class

python - 在 Google Colab 中使用 pyspark