java - Spark中如何在不同的worker上运行任务?

标签 java apache-spark

我有以下 Spark 代码:

package my.spark;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public class ExecutionTest {    
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("ExecutionTest")
                .getOrCreate();

        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

        int slices = 2;
        int n = slices;
        List<String> list = new ArrayList<>(n);
        for (int i = 0; i < n; i++) {
            list.add("" + i);
        }

        JavaRDD<String> dataSet = jsc.parallelize(list, slices);

        dataSet.foreach(str -> {
            System.out.println("value: " + str);
            Thread.sleep(10000);
        });

        System.out.println("done");

        spark.stop();
    }

}

我已经使用以下命令运行主节点和两个工作节点(本地主机上的所有内容;Windows):

bin\spark-class org.apache.spark.deploy.master.Master

和(两次):

bin\spark-class org.apache.spark.deploy.worker.Worker spark://<local-ip>:7077

一切都正确开始。

使用命令提交作业后:

bin\spark-submit --class my.spark.ExecutionTest --master spark://<local-ip>:7077 file:///<pathToFatJar>/FatJar.jar

命令已启动,但 value: 0value: 1 输出由其中一名工作人员写入(如与工作人员关联的页面上的Logs > stdout 上所示)。第二个工作人员在 Logs > stdout 中没有任何内容。据我了解,这意味着每次迭代都是由同一个工作人员完成的。

如何在两个不同的正在运行的工作线程上运行这些任务?

最佳答案

这是可能的,但我不确定它是否每次都可以正常工作。然而,在测试时,每次都按预期工作。

我已经使用 Windows 10 x64 主机和 4 个虚拟机 (VM) 测试了我的代码:具有 Debian 9(延伸)内核 4.9.0 x64 的 VirtualBox、仅主机网络、Java 1.8.0_144、适用于 Hadoop 2.7 的 Apache Spark 2.2.0 (spark-2.2.0-bin-hadoop2.7.ta​​r.gz)。

我一直在虚拟机上使用主服务器和 3 个从服务器,在 Windows 上使用另外一个从服务器:

  • debian-master - 1 个 CPU,1 GB RAM
  • debian-slave1 - 1 个 CPU,1 GB RAM
  • debian-slave2 - 1 个 CPU,1 GB RAM
  • debian-slave3 - 2 个 CPU,1 GB RAM
  • windows-slave - 4 CPU,8 GB RAM

我正在将作业从 Windows 计算机提交到位于虚拟机上的主服务器。

开头和之前一样:

    SparkSession spark = SparkSession
            .builder()
            .config("spark.cores.max", coresCount) // not necessary
            .appName("ExecutionTest")
            .getOrCreate();

[重要] coresCount 对于分区至关重要 - 我必须使用已用核心的数量来对数据进行分区,而不是工作线程/执行程序的数量。

接下来,我必须创建 JavaSparkContext 和 RDD。重用 RDD 允许多次执行同一组工作线程。

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    JavaRDD<Integer> rddList
          = jsc.parallelize(
                    IntStream.range(0, coresCount * 2)
                             .boxed().collect(Collectors.toList()))
               .repartition(coresCount);

我创建了包含 coresCount * 2 元素的 rddList 。等于 coresCount 的元素数量不允许在所有关联的工作线程上运行(在我的例子中)。也许,coresCount + 1 就足够了,但我还没有测试它,因为 coresCount * 2 也不够。

接下来要做的是运行命令:

    List<String> hostsList
        = rddList.map(value -> {
                Thread.sleep(3_000);
                return InetAddress.getLocalHost().getHostAddress();
            })
            .distinct()
            .collect();

    System.out.println("-----> hostsList = " + hostsList);

Thread.sleep(3_000) 对于正确分配任务是必要的。 3秒对我来说就足够了。可能该值可能更小,有时可能需要更高的值(我猜该值取决于工作人员从主机获取任务执行的速度)。

上述代码将在与工作线程关联的每个核心上运行,因此每个工作线程不止一个。为了在每个工作线程上运行一个命令,我使用了以下代码:

/* as static field of class */
private static final AtomicBoolean ONE_ON_WORKER = new AtomicBoolean(false);

...

    long nodeCount
        = rddList.map(value -> {
                Thread.sleep(3_000);
                if (ONE_ON_WORKER.getAndSet(true) == false) {
                    System.out.println("Executed on "
                            + InetAddress.getLocalHost().getHostName());
                    return 1;
                } else {
                    return 0;
                }
            })
            .filter(val -> val != 0)
            .count();

    System.out.println("-----> finished using #nodes = " + nodeCount);

当然,最后,停止:

    spark.stop();

关于java - Spark中如何在不同的worker上运行任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46031549/

相关文章:

python - 为什么 python dataFrames' 只位于同一台机器上?

apache-spark - Kafka 和 Pyspark 集成

Java:小程序有哪些可用的安全设置

java - 如何在我的 Controller 中访问 POST 变量?

java - MigLayout,禁用 JButton 宽度调整大小

apache-spark - 如何使用 Supervisord 自动启动 Apache Spark 集群?

scala - 如何将kafka流转换为spark RDD或Spark Dataframe

scala - 使用正则表达式基于另一个 RDD 过滤一个 RDD

java - 我如何开始使用 plivo 录制我的通话?

java - 方法 actionIncation.getInvocalContext().getSession() 不返回任何内容