apache-spark - 如何从Apache Spark的数据框中选择大小相同的分层样本?

标签 apache-spark pyspark spark-dataframe

我在Spark 2中有一个数据框,如下所示,其中用户有50到数千个帖子。我想创建一个新数据框,将所有用户都保留在原始数据框中,但每个用户只有5个随机采样的帖子。

+--------+--------------+--------------------+
| user_id|       post_id|                text|
+--------+--------------+--------------------+
|67778705|44783131591473|some text...........|
|67778705|44783134580755|some text...........|
|67778705|44783136367108|some text...........|
|67778705|44783136970669|some text...........|
|67778705|44783138143396|some text...........|
|67778705|44783155162624|some text...........|
|67778705|44783688650554|some text...........|
|68950272|88655645825660|some text...........|
|68950272|88651393135293|some text...........|
|68950272|88652615409812|some text...........|
|68950272|88655744880460|some text...........|
|68950272|88658059871568|some text...........|
|68950272|88656994832475|some text...........|
+--------+--------------+--------------------+

posts.groupby('user_id').agg(sample('post_id'))这样的东西,但是在pyspark中没有这样的功能。

有什么建议吗?

更新:

这个问题与另一个紧密相关的问题stratified-sampling-in-spark在两个方面不同:
  • 在上面的另一个问题中,它询问的是不成比例的分层抽样,而不是普通的成比例方法。
  • 它询问有关在Spark的Dataframe API中而不是RDD中执行此操作的问题。

  • 我也更新了问题的标题以澄清这一点。

    最佳答案

    使用sampleBy将产生近似解决方案。这是一种替代方法,比上面的方法有些棘手,但总是导致样本大小完全相同。

    import org.apache.spark.sql.functions.row_number
    import org.apache.spark.sql.expressions.Window
    
    df.withColumn("row_num",row_number().over(Window.partitionBy($"user_id").orderBy($"something_random"))
    

    如果您还没有随机ID,则可以使用org.apache.spark.sql.functions.rand创建一个具有随机值的列,以保证您的随机采样。

    关于apache-spark - 如何从Apache Spark的数据框中选择大小相同的分层样本?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41516805/

    相关文章:

    scala - 如何使用mapGroups在scala Spark中的groupby之后计算列中的不同值

    java - Spark Kafka 流媒体问题

    python - 使用 Spark DataFrames 对多个字符串分类特征进行一次性编码

    sql - 使用 Spark SQL 将一列拆分为多列

    numpy - 是否可以在Spark Dataframe列中存储一个numpy数组?

    apache-spark - Spark : regression model threshold and precision

    python - Spark 中的分区和分桶有什么区别?

    java - 在 Spark 中将数据集应用为广播

    python - 如何更新 Mac 上 PyCharm 中运行的 pyspark 使用的 Java keystore ?

    python - Pyspark 'NoneType'对象没有属性 '_jvm'错误