python - 在 python 或 Spark 中获取大数据缺失值的最快方法是什么?

标签 python pyspark bigdata

我正在处理多个数据表,这些数据表的行数超过 2000 万行,列数超过 30 列。 目前我正在使用 python pyspark 来计算 Null 值并计算缺失率。 但一列检查需要40分钟。如果有其他更好的方法来处理它以使其更快,请告诉我。欣赏它。

我当前的代码是:

variables = ['A', 'B', ....]
for variable in variables:
    column = pandas.read_sql_query('select %s from dbo.ORDERS' % (variable), con=cnxn)
    column_missing = column.filter(column[variable].isnull()).count()
    total = len(column)
    missing = len(column_missing)

编辑后的代码:

import pyodbc
import pandas
import numpy
import datetime
import time
from pyspark.sql.functions import lit, col

order_pk = pandas.read_sql_query('select %s from dbo.ORDERS' % ('ORDER_PK'), con=cnxn)
summary = order_pk.describe().filter(col('ORDER_PK') == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in order_pk.columns)).show()

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-12-30285cf9f59e> in <module>()
----> 1 summary = order_pk.describe().filter(col('ORDER_PK') == "count")
      2 summary.select(*((lit(rows)-col(c)).alias(c) for c in order_pk.columns)).show()

c:\users\pcui\appdata\local\programs\python\python37\lib\site-packages\pyspark\sql\functions.py in _(col)
     40     def _(col):
     41         sc = SparkContext._active_spark_context
---> 42         jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
     43         return Column(jc)
     44     _.__name__ = name

AttributeError: 'NoneType' object has no attribute '_jvm'

最佳答案

由于您尝试在循环中建立连接,因此需要花费大量时间。

from pyspark.sql.functions import lit
rows = df.count()
summary = df.describe().filter(col(variable) == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in df.columns)).show()

您可以使用这个而不是column.isnull().sum(),因为它需要很长时间。

编辑:请使用 SQLAlchemy 查询数据库以获取每个变量的计数。一次性完成是不错的选择。

关于python - 在 python 或 Spark 中获取大数据缺失值的最快方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52332373/

相关文章:

python - 星火/PySpark : An error occurred while trying to connect to the Java server (127. 0.0.1:39543)

python - 如何将属性方法用作查询中的字段?

python - 如何将列表元素添加到字典中

sql - 如何在 Spark SQL 中格式化日期?

apache-spark - 我可以动态更改 SparkContext.appName 吗?

postgresql - OLAP 和 postgresql- 工具还是方法论?

python - 如何在 Python 中对复制的 MagicMock 调用方法?

python - 如果 "null=True",存储在 DB 中,是 "NULL"还是 "empty values"?

apache-spark - Tachyon 默认情况下是否由 Apache Spark 中的 RDD 实现?

json - SPARK read.json 抛出 java.io.IOException : Too many bytes before newline