apache-spark - 根据条件组合 Spark 数据框列中的多行

标签 apache-spark pyspark apache-spark-sql

我正在尝试根据条件在 spark 数据框中组合多行:

这是我的数据框(df):

|username | qid | row_no | text  |
 ---------------------------------
|  a      | 1   |  1     | this  |
|  a      | 1   |  2     |  is   |
|  d      | 2   |  1     |  the  |
|  a      | 1   |  3     | text  |
|  d      | 2   |  2     |  ball |

我希望它看起来像这样

|username | qid | row_no | text        |
 ---------------------------------------
|   a     | 1   |  1,2,3 | This is text|
|   b     | 2   |  1,2   | The ball    |

我正在使用 spark 1.5.2 它没有 collect_list 功能

最佳答案

collect_list 仅在 1.6 中出现。

我会检查底层的 RDD。方法如下:

data_df.show()
+--------+---+------+----+
|username|qid|row_no|text|
+--------+---+------+----+
|       d|  2|     2|ball|
|       a|  1|     1|this|
|       a|  1|     3|text|
|       a|  1|     2|  is|
|       d|  2|     1| the|
+--------+---+------+----+

然后这个

reduced = data_df\
    .rdd\
    .map(lambda row: ((row[0], row[1]), [(row[2], row[3])]))\
    .reduceByKey(lambda x,y: x+y)\
    .map(lambda row: (row[0], sorted(row[1], key=lambda text: text[0]))) \
    .map(lambda row: (
            row[0][0], 
            row[0][1], 
            ','.join([str(e[0]) for e in row[1]]),
            ' '.join([str(e[1]) for e in row[1]])
        )
    )

schema_red = typ.StructType([
        typ.StructField('username', typ.StringType(), False),
        typ.StructField('qid', typ.IntegerType(), False),
        typ.StructField('row_no', typ.StringType(), False),
        typ.StructField('text', typ.StringType(), False)
    ])

df_red = sqlContext.createDataFrame(reduced, schema_red)
df_red.show()

以上产生了以下内容:

+--------+---+------+------------+
|username|qid|row_no|        text|
+--------+---+------+------------+
|       d|  2|   1,2|    the ball|
|       a|  1| 1,2,3|this is text|
+--------+---+------+------------+

在 Pandas 中

df4 = pd.DataFrame([
        ['a', 1, 1, 'this'],
        ['a', 1, 2, 'is'],
        ['d', 2, 1, 'the'],
        ['a', 1, 3, 'text'],
        ['d', 2, 2, 'ball']        
    ], columns=['username', 'qid', 'row_no', 'text'])

df_groupped=df4.sort_values(by=['qid', 'row_no']).groupby(['username', 'qid'])

df3 = pd.DataFrame()
df3['row_no'] = df_groupped.apply(lambda row: ','.join([str(e) for e in row['row_no']]))
df3['text']   = df_groupped.apply(lambda row: ' '.join(row['text']))

df3 = df3.reset_index()

关于apache-spark - 根据条件组合 Spark 数据框列中的多行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43356725/

相关文章:

apache-spark - 使用pyspark分组,排序和汇总Spark数据框架

sql - 窗口功能的默认窗口框架是什么

dataframe - 在 DataFrame 上描述 vs printSchema 方法

apache-spark - 将包含 JSON 字符串的列拆分为每个包含字符串中的一个键值对的列

不支持 java.util.Date

python - Spark 作业在 rdd takeSample 上无限期挂起

apache-spark-sql - spark 2.4.0 为左连接提供 "Detected implicit cartesian product"异常,右 DF 为空

apache-spark - 无法在 Spark 结构化流中转换 Kafka Json 数据

apache-spark - 理解 Spark SQL 中向量列的表示

json - Pyspark:读取对象之间没有分隔符的 JSON 数据文件