apache-spark - 如何将一个 RDD 拆分为两个或多个 RDD?

标签 apache-spark pyspark rdd

我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD。

如果您熟悉 SAS,则如下所示:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

这导致了两个不同的数据集。必须立即坚持才能得到我想要的结果......

最佳答案

不可能从单个转换中产生多个 RDD*。如果要拆分 RDD,则必须应用 filter对于每个拆分条件。例如:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果您只有一个二进制条件并且计算很昂贵,您可能更喜欢这样的东西:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
这意味着只有一个谓词计算,但需要额外传递所有数据。
重要的是要注意,只要输入 RDD 被正确缓存并且没有关于数据分布的额外假设,就重复过滤器和带有嵌套 if-else 的 for 循环之间的时间复杂度而言,没有显着差异。
使用 N 个元素和 M 个条件,您必须执行的操作数显然与 N 乘以 M 成正比。在 for 循环的情况下,它应该更接近 (N + MN)/2 并且重复过滤器正好是 NM,但在最后这一天只不过是 O(NM)。你可以看到我与 Jason Lenderman 的讨论**阅读一些利弊。
在非常高的层次上,您应该考虑两件事:
  • Spark 转换是惰性的,直到你执行一个 Action 你的 RDD 才被物化
    为什么这有关系?回到我的例子:
     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    
    如果以后我决定我只需要 rdd_odd那么就没有理由实现rdd_even .
    如果您查看 SAS 示例来计算 work.split2您需要实现输入数据和 work.split1 .
  • RDD 提供声明式 API。当您使用 filtermap如何执行此操作完全取决于 Spark 引擎。只要传递给转换的函数没有副作用,它就创造了优化整个管道的多种可能性。

  • 归根结底,这种情况并不足以证明其自身的转型是合理的。
    这个带有过滤器模式的映射实际上是在一个核心 Spark 中使用的。查看我对 How does Sparks RDD.randomSplit actually split the RDD 的回答和一个 relevant partrandomSplit方法。
    如果唯一的目标是实现输入拆分,则可以使用 partitionBy DataFrameWriter的条款哪种文本输出格式:
    def makePairs(row: T): (String, String) = ???
    
    data
      .map(makePairs).toDF("key", "value")
      .write.partitionBy($"key").format("text").save(...)
    

    * Spark 中只有 3 种基本类型的转换:
  • RDD[T] => RDD[T]
  • RDD[T] => RDD[U]
  • (RDD[T], RDD[U]) => RDD[W]

  • 其中 T、U、W 可以是原子类型或 products/元组(K,V)。任何其他操作都必须使用上述的某种组合来表达。您可以查看 the original RDD paper更多细节。
    ** https://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
    *** 另见 Scala Spark: Split collection into several RDD?

    关于apache-spark - 如何将一个 RDD 拆分为两个或多个 RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32970709/

    相关文章:

    python - 如何将三个 RDD 连接到一个元组中?

    scala - 在 Spark RandomForestClassifier 中预测类别概率

    apache-spark - Pyspark 找不到数据源 : kafka

    scala - SQLContext 隐式

    python - 将标准 python 键值字典列表转换为 pyspark 数据框

    python - 无法更新 Pyspark 中的变量

    clojure - 对 Clojure 序列进行解元组

    python - 想要在spark python中将字符串值转换为 float

    python - 语料库中的 Pyspark CountVectorizer 和词频

    java - Spark 数据集自定义分区器