apache-spark - Spark 。将 RDD 拆分成批

标签 apache-spark rdd batching

我有 RDD,其中每条记录都是 int:

[0,1,2,3,4,5,6,7,8]

我需要做的就是将这个 RDD 分成几批。 IE。制作另一个 RDD,其中每个元素都是固定大小的元素列表:
[[0,1,2], [3,4,5], [6,7,8]]

这听起来微不足道,但是,我最近几天感到困惑,除了以下解决方案之外找不到任何东西:
  • 使用 ZipWithIndex 枚举 RDD 中的记录:
    [0,1,2,3,4,5] -> [(0, 0),(1, 1),(2, 2),(3, 3),(4, 4),(5, 5)]
  • 使用 map() 迭代这个 RDD 并计算索引,如 index = int(index / batchSize)[1,2,3,4,5,6] -> [(0, 0),(0, 1),(0, 2),(1, 3),(1, 4),(1, 5)]
  • 然后按生成的索引分组。
    [(0, [0,1,2]), (1, [3,4,5])]

  • 这将为我提供我需要的东西,但是,我不想在这里使用 group。当您使用普通 Map Reduce 或某些抽象(如 Apache Crunch)时,这很简单。但是有没有办法在不使用重分组的情况下在 Spark 中产生类似的结果?

    最佳答案

    您没有清楚地解释为什么需要固定大小的 RDD,这取决于您要完成的工作,可能有更好的解决方案,但是为了回答所提出的问题,我看到以下选项:
    1) 根据项目数和批次大小实现过滤器。例如,如果您在原始 RDD 中有 1000 个项目并希望将它们拆分为 10 个批次,您最终将应用 10 个过滤器,第一个检查索引是否为 [0, 99],第二个检查 [100, 199]等等。应用每个过滤器后,您将拥有一个 RDD。重要的是要注意原始 RDD 可能在过滤之前被缓存。优点:每个生成的 RDD 可以单独处理,不必在一个节点上完全分配。缺点:这种方法随着批次数量的增加而变慢。
    2)逻辑上与此类似,但不是过滤器,您只需实现一个自定义分区器,该分区器根据此处所述的索引(键)返回分区 ID:Custom partitioner for equally sized partitions .优点:比过滤器快。缺点:每个分区必须适合一个节点。
    3)如果原始RDD中的顺序不重要,只需要大致相同的分块,你可以合并/重新分区,解释在这里https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html

    关于apache-spark - Spark 。将 RDD 拆分成批,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47266098/

    相关文章:

    performance - 在spark中使用scala将预测结果保存到HDFS表中在yarn-cluster模式下非常慢

    java - 使用 Spark 和 RDD 映射 cassandra 数据库的表

    python - spark中哪个函数用于通过key组合两个RDD

    .net - 多态性是否会干扰 NHibernate 的批量插入/更新功能?

    apache-spark - Spark RDD 以独占方式按键分区

    apache-spark - Spark中的二级排序

    postgresql - 集群中有 20 个分区但没有工作人员被使用的 RDD

    go - Boltdb 中的批处理操作

    batching - 为整个网格而不是每个顶点设置颜色

    Scala - 'this' 在 Scala 中对于事件对象可以为 null 吗?