python - 如何检测十进制列是否应转换为整数或 double ?

标签 python elasticsearch pyspark apache-spark-sql

我使用 Apache spark 作为 ETL 工具,将表从 Oracle 提取到 Elasticsearch

我遇到了数字列的问题,spark 将它们识别为 decimalElasticsearch 不接受 decimal 类型;所以我将每个 decimal 列转换为 doubleElasticsearch 可以接受。

dataFrame = dataFrame.select(
    [col(name) if 'decimal' not in colType else col(name).cast('double') for name, colType in dataFrame.dtypes]
)

当前每个数字列都是double的问题;它是否具有十进制值。

我的问题是有什么方法可以检测列类型应该转换为整数类型还是 double 类型?

最佳答案

您可以从数据帧的架构中检索数据类型 == DecimalType() 的所有列名称,请参见下面的示例(在 Spark 2.4.0 上测试):

更新:只需使用 df.dtypes 即可检索信息。

from pyspark.sql.functions import col

df = spark.createDataFrame([ (1, 12.3, 1.5, 'test', 13.23) ], ['i1', 'd2', 'f3', 's4', 'd5'])

df = df.withColumn('d2', col('d2').astype('decimal(10,1)')) \
       .withColumn('d5', col('d5').astype('decimal(10,2)'))
#DataFrame[i1: bigint, d2: decimal(10,1), f3: double, s4: string, d5: decimal(10,2)]

decimal_cols = [ f[0] for f in df.dtypes if f[1].startswith('decimal') ]

print(decimal_cols)
['d2', 'd5']

只是跟进:上述方法不适用于arraystruct 和嵌套数据结构。如果 struct 中的字段名称不包含空格、点等字符,则可以直接使用 df.dtypes 中的类型。

import re
from pyspark.sql.functions import array, struct, col

decimal_to_double = lambda x: re.sub(r'decimal\(\d+,\d+\)', 'double', x)

df1 = df.withColumn('a6', array('d2','d5')).withColumn('s7', struct('i1','d2'))
# DataFrame[i1: bigint, d2: decimal(10,1), l3: double, s4: string, d5: decimal(10,2), a6: array<decimal(11,2)>, s7: struct<i1:bigint,d2:decimal(10,1)>]

df1.select(*[ col(d[0]).astype(decimal_to_double(d[1])) if 'decimal' in d[1] else col(d[0]) for d in df1.dtypes ])
# DataFrame[i1: bigint, d2: double, l3: double, s4: string, d5: double, a6: array<double>, s7: struct<i1:bigint,d2:double>]

但是,如果 StructType() 的任何字段名称包含空格 等,上述方法可能无法正常工作。在这种情况下,我建议您检查:df.schema.jsonValue()['fields'] 以检索和操作 JSON 数据以进行 dtype 转换。

关于python - 如何检测十进制列是否应转换为整数或 double ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58048745/

相关文章:

python - 在 Python 中压缩整数列表

python - Firebase Cloud Functions Python - 无法添加依赖项

python - Django .save() 不可预测地处理 update_fields 输入

python - 与长时间运行的 python 脚本交互

elasticsearch - 在elasticsearch中创建索引时自定义分析器的mapper_parsing_exception?

csv - Spark : spark-csv takes too long

elasticsearch - ElasticSearch.Net NEST BulkInsert文档中没有ID

php - 在Elastic Search中进行术语查询

macos - 无法运行 pyspark : Failed to find Spark jars directory

python - 如何从 pyspark 的列中删除连字符?