apache-spark - 每行计算并在 DataFrame PySpark 中添加新列 - 更好的解决方案?

标签 apache-spark dataframe pyspark apache-spark-sql rdd

我在 PySpark 中使用 Data Frame 我有以下任务:检查所有列中每列的“次数”值 > 2 的次数。对于 u1 它是 0,对于 u2 => 2 等等

user    a   b   c   d   times
   u1   1   0   1   0   0
   u2   0   1   4   3   2
   u3   2   1   7   0   1

我的解决方案如下。它有效,我不确定这是最好的方法并且还没有尝试真正的大数据。我不喜欢转换为 rdd 并返回数据框。有更好的吗?我一开始想按每列的 UDF 计算,但没有找到一种方法来计算每行的所有结果:

def calculate_times(row):
    times = 0
    for index, item in enumerate(row):
        if not isinstance(item, basestring):
           if item > 2:
             times = times+1

return times    

def add_column(pair):
    return dict(pair[0].asDict().items() + [("is_outlier", pair[1])])   

def calculate_times_for_all(df): 
    rdd_with_times = df.map(lambda row: (calculate_times(row))
    rdd_final = df.rdd.zip(rdd_with_times).map(add_column)

    df_final = sqlContext.createDataFrame(rdd_final)
    return  df_final

对于这个解决方案,我使用了这个主题 How do you add a numpy.array as a new column to a pyspark.SQL DataFrame?

谢谢!

最佳答案

这只是一个简单的单行代码。示例数据:

df = sc.parallelize([
    ("u1", 1, 0, 1, 0), ("u2", 0, 1, 4, 3), ("u3", 2, 1, 7, 0)
]).toDF(["user", "a", "b", "c", "d"])

withColumn:

df.withColumn("times", sum((df[c] > 2).cast("int") for c in df.columns[1:]))

结果:

+----+---+---+---+---+-----+
|user|  a|  b|  c|  d|times|
+----+---+---+---+---+-----+
|  u1|  1|  0|  1|  0|    0|
|  u2|  0|  1|  4|  3|    2|
|  u3|  2|  1|  7|  0|    1|
+----+---+---+---+---+-----+

注意:

它的列是 nullable 你应该纠正它,例如使用 coalesce:

from pyspark.sql.functions import coalesce

sum(coalesce((df[c] > 2).cast("int"), 0) for c in df.columns[1:])

关于apache-spark - 每行计算并在 DataFrame PySpark 中添加新列 - 更好的解决方案?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41780521/

相关文章:

python-2.7 - python : how to keep leading zeros with dataframe. to_csv

python - 多个条件下的数据框切片 Python

scala - 如何使用 withColumn 创建新列以将两个数字列集中为 String ?

r - 数据帧 : How to compare current row to some other rows without looping?

python - 将 PythonRDD 列表减少为一个列表

apache-spark - pyspark rdd/dataframe 不会自动在 cassandra 中创建表

apache-spark - 如何刷新 HDFS 路径?

apache-spark - 如何在pyspark中扩展月份

apache-spark - Spark 执行器内存减少到 1/2

java - JavaSparkContext.wholeTextFiles 的数据集 API 模拟