python - 使用 Spark 获取值超过某个阈值的所有列的名称

标签 python apache-spark pyspark emr

背景

我们正在将数据从 Redshift 卸载到 S3,然后将其加载到数据帧中,如下所示:

df = spark.read.csv(path, schema=schema, sep='|')

我们将 PySpark 和 AWS EMR(版本 5.4.0)与 Spark 2.1.0 结合使用。

问题

我有一个 Redshift 表,正在以 CSV 格式读入 PySpark。记录采用这种格式:

url,category1,category2,category3,category4
http://example.com,0.6,0.0,0.9,0.3

url 是 VARCHAR,category 值是介于 0.0 和 1.0 之间的 FLOAT。

我想要做的是生成一个新的 DataFrame,每个类别只有一行,其中原始数据集中的值高于某个阈值 X。例如,如果阈值设置为 0.5,那么我希望我的新数据集看起来像这样:

url,category
http://example.com,category1
http://example.com,category3

我是 Spark/PySpark 的新手,所以我不确定如何/如果可以这样做,我们将不胜感激!

编辑:

想添加我的解决方案(基于 Pushkr 的代码)。我们有大量类别要加载,因此为了避免对每个选择进行硬编码,我执行了以下操作:

parsed_df = None
for column in column_list:
    if not parsed_df:
        parsed_df = df.select(df.url, when(df[column]>threshold,column).otherwise('').alias('cat'))
    else:
        parsed_df = parsed_df.union(df.select(df.url, when(df[column]>threshold,column).otherwise('')))
if parsed_df is not None:
    parsed_df = parsed_df.filter(col('cat') != '')

其中 column_list 是之前生成的类别列名称列表,threshold 是选择类别所需的最小值。

再次感谢!

最佳答案

这是我试过的东西 -

data = [('http://example.com',0.6,0.0,0.9,0.3),('http://example1.com',0.6,0.0,0.9,0.3)]

df = spark.createDataFrame(data)\
     .toDF('url','category1','category2','category3','category4')

from pyspark.sql.functions import *



df\
    .select(df.url,when(df.category1>0.5,'category1').otherwise('').alias('category'))\
    .union(\
    df.select(df.url,when(df.category2>0.5,'category2').otherwise('')))\
    .union(\
    df.select(df.url,when(df.category3>0.5,'category3').otherwise('')))\
    .union(\
    df.select(df.url,when(df.category4>0.5,'category4').otherwise('')))\
    .filter(col('category')!= '')\
    .show()

输出:

+-------------------+---------+
|                url| category|
+-------------------+---------+
| http://example.com|category1|
|http://example1.com|category1|
| http://example.com|category3|
|http://example1.com|category3|
+-------------------+---------+

关于python - 使用 Spark 获取值超过某个阈值的所有列的名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43613956/

相关文章:

apache-spark - 如何在具有不同内存和核心数量的集群上调整 spark 作业

python - 有没有一种简单的方法可以覆盖列表对象的方法 __getitem__?

python - WSGI无法导入本地安装的包

hadoop - 没有hadoop apache spark可以运行吗?

performance - 随着硬件规模的扩大,Spark 的性能会变慢

apache-spark - 将数据框保存为外部配置单元表

python - Python中Spark RDD的列操作

python - Flask SqlAlchemy 多对多关系在按关系名称访问时只返回一个结果

python - 按特定条件的出现对数据框进行分组

python - pyspark,逻辑回归,如何获得各个特征的系数