java - Apache Spark : How to structure code of a Spark Application (especially when using Broadcasts)

标签 java performance optimization apache-spark

我有一个关于 Java Spark 应用程序中代码结构的一般性问题。我想将 Spark 转换的实现代码与 RDD 的调用分开,这样即使使用包含大量代码行的大量转换,应用程序的源代码也能保持清晰。

我先给你一个简短的例子。在这种情况下,flatMap 转换的实现是作为匿名内部类提供的。这是一个简单的应用程序,它读取一个整数 RDD,然后将每个元素乘以一个整数数组,该数组之前广播到所有工作节点:

public static void main(String[] args) {

    SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));

    final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });

    result = result.flatMap(new FlatMapFunction<Integer, Integer>() {
        public Iterable<Integer> call(Integer t) throws Exception {
            int[] values = factors.value();
            LinkedList<Integer> result = new LinkedList<Integer>();
            for (int value : values) result.add(t * value);
            return result;
        }
    });

    System.out.println(result.collect());   // [5, 10, 15, 8, 16, 24, 9, 18, 27]

    sc.close();
}

为了构建代码,我将 Spark 函数的实现提取到了另一个类中。 SparkFunctions 类提供了 flatMap 转换的实现,并有一个 setter 方法来获取对广播变量的引用(......在我的真实场景中,这个类中会有很多操作,所有访问广播数据)。

我的经验是,表示 Spark 转换的方法可以是静态的,只要它不访问 Broadcast 变量或 Accumulator 变量即可。为什么?静态方法只能访问静态属性。对 Broadcast 变量的静态引用始终为 null(可能是因为当 Spark 将类 SparkFunctions 发送到工作节点时它未被序列化)。

@SuppressWarnings("serial")
public class SparkFunctions implements Serializable {

    private Broadcast<int[]> factors;

    public SparkFunctions() {
    }

    public void setFactors(Broadcast<int[]> factors) {
        this.factors = factors;
    }

    public final FlatMapFunction<Integer, Integer> myFunction = new FlatMapFunction<Integer, Integer>() {
        public Iterable<Integer> call(Integer t) throws Exception {
            int[] values = factors.value();
            LinkedList<Integer> result = new LinkedList<Integer>();
            for (int value : values) result.add(t * value);
            return result;
        }
    };

}

这是使用 SparkFunctions 类的应用程序的第二个版本:

public static void main(String[] args) {

    SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));

    final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });

    // 1) Initializing
    SparkFunctions functions = new SparkFunctions();

    // 2) Pass reference of broadcast variable
    functions.setFactors(factors);

    // 3) Implementation is now in the class SparkFunctions
    result = result.flatMap(functions.myFunction);

    System.out.println(result.collect());   // [5, 10, 15, 8, 16, 24, 9, 18, 27]

    sc.close();
}

应用程序的两个版本都在工作(本地和集群设置),但我想问的是它们是否同样高效。

问题1:在我看来,Spark序列化类SparkFunctions,包括Broadcast变量,并将其发送到工作节点,以便节点可以在自己的函数中使用该函数任务。数据是否被两次发送到工作节点,第一次是使用SparkContext进行广播,然后是类SparkFunctions的序列化?还是每个元素发送一次(广播加 1 次)?

问题 2:您能否就源代码的其他结构提供建议?

请不要提供如何防止广播的解决方案。我有一个复杂得多的实际应用程序。

我发现的类似问题并没有真正帮助:

预先感谢您的帮助!

最佳答案

这是关于问题 1

当一个spark job被提交时,jobs被划分为stages->tasks。这些任务实际上是在工作节点上执行转换和操作。驱动程序的 sumbitTask() 将有关广播变量的函数和元数据序列化到所有节点。

广播工作原理剖析。

Driver 创建一个本地目录来存储要广播的数据,并启动一个可以访问该目录的 HttpServer。数据实际上是在调用广播时写入目录的(val bdata = sc.broadcast(data))。同时数据也以StorageLevel内存+磁盘的方式写入驱动的blockManger。 block 管理器为数据分配一个 blockId(BroadcastBlockId 类型)。

真正的数据只有在执行器反序列化它接收到的任务时才会广播,它还会以 Broadcast 对象的形式获取广播变量的元数据。然后调用元数据对象(bdata 变量)的 readObject() 方法。此方法将首先检查本地 block 管理器以查看是否已经存在本地副本。如果没有,数据将从驱动程序中获取。获取数据后,它会存储在本地 block 管理器中以供后续使用。

关于java - Apache Spark : How to structure code of a Spark Application (especially when using Broadcasts),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35754123/

相关文章:

java - 使用叠加层时如何获得更暗的背景?

java - 在依赖于另外两个项目的 Eclipse Java 项目中,如何解决 jar 冲突?

r - 如何使用 LpSolve 在 R 中设置线性规划优化?

c++ - 这个函数调用会被g++优化吗?

java - 对从互联网获取的文本进行 UTF 8 转换

python - 当数组非常大时,根据另一个数组的范围有效地分离数组的一部分

Python Pandas : Convert 2, 000,000 DataFrame 行到二进制矩阵 (pd.get_dummies()) 没有内存错误?

c# - 优化从表达式树生成的 Func.Invoke()

c++ - 使用清理代码优化多个导出点

java - 使用 Java 从 XML 中按属性值获取子节点