python - 限制 Spark 上下文中的记录数量

标签 python hadoop apache-spark pyspark rdd

我想减少每个reducer的记录数量,并将结果变量保留为rdd

使用 takeSample 似乎是显而易见的选择,但是,它返回一个 集合 而不是 SparkContext 对象。

我想出了这个方法:

rdd = rdd.zipWithIndex().filter(lambda x:x[1]<limit).map(lambda x:x[0])

但是,这种方法非常慢且效率不高。

是否有更智能的方法来获取小样本并保持数据结构为rdd

最佳答案

如果您想要一个示例子集,并且无法对数据做出任何其他假设,那么takeparallelize相结合可能是最佳解决方案:

sc.parallelize(rdd.take(n))

它将涉及相对较少数量的分区(在最好的情况下只有一个),并且小型 n 的网络流量成本应该可以忽略不计。

采样(randomSplitsample)将需要与使用 filterzipWithIndex 相同的完整数据扫描。

假设没有数据偏差,您可以尝试类似的方法来解决这个问题:

from __future__ import division  # Python 2 only

def limitApprox(rdd, n, timeout):
    count = rdd.countApprox(timeout)
    if count <= n:
        return rdd
    else:
        rec_per_part = count // rdd.getNumPartitions()
        required_parts = n / rec_per_part if rec_per_part else 1
        return rdd.mapPartitionsWithIndex(
            lambda i, iter: iter if i < required_parts else []
        )
  • 这仍然会访问每个分区,但如果没有必要,会尝试避免计算内容
  • 如果数据偏差较大,则不起作用
    • 如果分布均匀,但 n << 超过每个分区的平均记录数,则可能会超出所需的数量。
    • 如果分布偏向高指数,则可能会出现采样不足的情况。

如果数据可以表示为,您可以尝试另一个技巧:

rdd.toDF().limit(n).rdd

关于python - 限制 Spark 上下文中的记录数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35871095/

相关文章:

hadoop - 在 spark yarn 集群中,容器如何工作取决于 RDD 分区的数量?

hadoop - HiBench 基准套件错误 : INPUT_HDFS: unbound variable

scala - Spark DataFrame 过滤 : retain element belonging to a list

scala - 如何使用Scala在Spark 2.1中将以毫秒为单位的字符串列转换为以毫秒为单位的时间戳?

java - 使用 Scala/Java API 基于外部数组的内容进行自定义排序

python - 阻塞 channel 与异步消息传递

python - pandas 数据框按索引和整数

python - 如何将密码传递给 Pyramid 和 Cornice?

hadoop - 如何在没有 hadoop 的情况下使用 Hive

python - Seaborn 热图未显示从字符串转换为数字的列