java - Spark 中的并发作业执行

标签 java multithreading apache-spark hadoop-yarn

我使用了以下格式的输入数据:

0
1
2
3
4
5
…
14

Input Location: hdfs://localhost:9000/Input/datasource

我使用以下代码片段使用多线程将 RDD 保存为文本文件:

package org.apache.spark.examples;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

class RunnableDemo implements Runnable
{

    private Thread t;
    private String threadName;
    private String path;
    private JavaRDD<String> javaRDD;
//  private JavaSparkContext javaSparkContext;

    RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
    {
        this.threadName=threadName;
        this.javaRDD=javaRDD;
        this.path=path;
//      this.javaSparkContext=javaSparkContext;
    }


    @Override
    public void run() {
        System.out.println("Running " +  threadName );      
        try {
            this.javaRDD.saveAsTextFile(path);
//          System.out.println(this.javaRDD.count());
            Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println("Thread " +  threadName + " interrupted.");
                }
         System.out.println("Thread " +  threadName + " exiting.");
//       this.javaSparkContext.stop();
    }

    public void start ()
       {
          System.out.println("Starting " +  threadName );
          if (t == null)
          {
             t = new Thread (this, threadName);
             t.start ();
          }
       }

}

public class SparkJavaTest {



    public static void main(String[] args) {

        //Spark Configurations:

        SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");

        JavaSparkContext ctx=new JavaSparkContext(sparkConf);

        SQLContext sqlContext = new SQLContext(ctx);        

        JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");




        List<StructField> fields= new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));

        JavaRDD<Row> rowRDD =dataCollection.map(
                new Function<String, Row>() {
                    @Override
                    public Row call(String record) throws Exception {
                        String[] fields = record.split("\u0001");                       
                        return RowFactory.create(Integer.parseInt(fields[0].trim()));
                    }                   
                });     

        StructType schema = DataTypes.createStructType(fields);

        DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);        
        dataFrame.registerTempTable("data");

        long recordsCount=dataFrame.count();        
        long splitRecordsCount=5;
        long splitCount =recordsCount/splitRecordsCount;
        List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();

        for(int i=0;i<splitCount;i++)
        {
            long start = i*splitRecordsCount;
            long end = (i+1)*splitRecordsCount;         
            DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);         
            list1.add(temp.toJavaRDD());
        }       

        long length =list1.size();

        int split=0;

        for (int i = 0; i < length; i++) {

            JavaRDD rdd1 =list1.get(i);

            JavaPairRDD rdd3=rdd1.cartesian(rdd1);

            JavaPairRDD<Row,Row> rdd4=rdd3.filter(
                    new Function<Tuple2<Row,Row>,Boolean>()
                    {
                        public Boolean call(Tuple2<Row,Row> s)
                        {
                            Row line1=s._1;
                            Row line2=s._2;

                            long app1 = Integer.parseInt(line1.get(0).toString());

                            long app2 = Integer.parseInt(line2.get(0).toString());


                            if(app1<app2)
                            {
                                return true;
                            }
                            return false;
                        }
                    });

            JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                @Override
                public String call(Tuple2<Row, Row> s)
                        throws Exception {

                    Row data1=s._1;
                    Row data2=s._2;

                    int x =Integer.parseInt(data1.get(0).toString());
                    int y =Integer.parseInt(data2.get(0).toString());

                    String result =x +","+ y+","+(x+y);
                    return result;
                }
            });

            RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

            R.start();
            split++;            
            R.start();

            int index =i;

            while(index<length)
            {
                JavaRDD rdd2 =list1.get(index);
                 rdd3=rdd1.cartesian(rdd2);

                 rdd4=rdd3.filter(
                        new Function<Tuple2<Row,Row>,Boolean>()
                        {
                            public Boolean call(Tuple2<Row,Row> s)
                            {
                                Row line1=s._1;
                                Row line2=s._2;

                                long app1 = Integer.parseInt(line1.get(0).toString());

                                long app2 = Integer.parseInt(line2.get(0).toString());


                                if(app1<app2)
                                {
                                    return true;
                                }
                                return false;
                            }
                        });         

                test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                    @Override
                    public String call(Tuple2<Row, Row> s)
                            throws Exception {

                        Row data1=s._1;
                        Row data2=s._2;

                        int x =Integer.parseInt(data1.get(0).toString());
                        int y =Integer.parseInt(data2.get(0).toString());

                        String result =x +","+ y+","+(x+y);
                        return result;
                    }
                });         

                R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

                R.start();
                split++;            
                index++;                
            }
        }
    }

}

在这种情况下,我遇到了以下异常

enter image description here

我已经尝试了以下链接中提供的解决方案

How to run concurrent jobs(actions) in Apache Spark using single spark context

但是,我仍然无法解决这个问题。

你能指导我解决这个问题吗?

最佳答案

首先,您尝试使用多个线程在驱动程序节点上执行所有工作。这并不真正符合 Spark 的精神,因为您案例中的每个工作单元都独立于其他单元,并且可以在不同的机器上执行。您在这里有一个玩具示例,但这对于大量数据将变得非常重要。

更好的方法是使用类似 mapPartitions 的方法将键的范围发送给每个工作人员,让他们执行相应的 SQL 查询,然后保存结果,每个工作人员一个线程。这将使代码更清晰且更易于推理(一旦您习惯了 RDD 的工作方式)。您显然需要设置 level of parallelism和适当的输入数据的分区数(讨论了 here )。

您的代码的直接问题是主线程启动其他线程,但不等待它们完成。通常这会导致生成的线程与父线程一起终止(参见 javadoc )。注意 answer 中的方式对于链接的问题,主函数在返回之前对生成的 future 执行 get()

关于java - Spark 中的并发作业执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30862956/

相关文章:

java - rdd.saveAsTextFile() 未处理异常

C++新线程休眠主线程

Java Bitset - “Not Thread-Safe” 的完整含义是什么

python - 如何使用 PySpark 执行一次热编码

java - 如何搜索 SQL 数据库并将其显示在 JTable 上

Java JTextPane + JScrollPane : de/activate automatic scrolling

java - 我需要帮助,在使用 JFrames 和 DrawWindows 时不要使用暴力

Java - 有没有一种方法可以让客户端和服务器通过 IPv4 连接而无需端口转发?

c - 提供以下程序的 4 种可能结果

apache-spark - 将 RDD 对以特定格式保存在输出文件中