apache-spark - 如何选择所有列而不是硬编码每一列?

标签 apache-spark pyspark apache-spark-sql

PySpark Dataframe 采用以下格式:

enter image description here

要访问列 c1,c2,c3 的 stddev 行,我使用:

df.describe().createOrReplaceTempView("table1")

df2 = sqlContext.sql("SELECT c1 AS f1, c2 as f2, c3 as f3 from table1")
ddd = df2.rdd.map(lambda x : (float(x.f1) , float(x.f2) , float(x.f3))).zipWithIndex().filter(lambda x: x[1] == 2).map(lambda x : x[0])
print type(ddd)
print type(ddd.collect())
print ddd.collect()

这打印:
<class 'pyspark.rdd.PipelinedRDD'>
<type 'list'>
[(0.7071067811865476, 0.7071067811865476, 0.7071067811865476)]

如何为所有列选择 stddev 值:c1,c2,c3,c4,c5 并为这些选择生成数据类型 [(0.7071067811865476, 0.7071067811865476, 0.7071067811865476.... 而不是将每个值硬编码到 SQL 字符串中?所以列数可以是可变的:5、10 列等...

要为 5 列完成此操作,我认为使用 "SELECT c1 AS f1, c2 as f2, c3 as f3, c4 as f4, c5 as f5 from table1" 但是否有更简洁的方法,而不是在 SQL 中对每个值进行硬编码,然后在生成 rdd 时相应地对值进行硬编码:df2.rdd.map(lambda x : (float(x.f1) , float(x.f2).....
因为我的解决方案不适用于不同长度的列。

最佳答案

为什么不直接使用 SQL 聚合?要么使用 agg

from pyspark.sql.functions import stddev

df.agg(*[stddev(c) for c in df.columns]).first()

其中 * 用于 agg(*exprs)select 的参数解包:

df.select([stddev(c) for c in df.columns]).first()

要删除名称,请将 Row 转换为纯 tuple :

tuple(df.select(...).first())

或者

df.select(...).rdd.map(tuple).first()

关于apache-spark - 如何选择所有列而不是硬编码每一列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42380748/

相关文章:

apache-spark - 为什么 Apache Livy session 显示应用程序 ID 为 NULL?

scala - 如何在 Spark 中显示 KeyValueGroupedDataset?

apache-spark - Pyspark:spark-submit 不像 CLI 那样工作

python - 如何在不聚合原始 RDD 分区的情况下对多个 RDD 进行分组?

java - Spark 执行器: Invalid initial heap size: -Xms0M

scala - 如何枚举HDFS目录中的文件

python - 如何让 matplotlib 在 AWS EMR Jupyter notebook 中工作?

python - 如何通过多值列过滤JSON数据

scala - 根据条件总结DataFrame的值

r - spark SQL : sqlContext. sql 的 SparklyR 包装器