java - 如何提高 Spark 性能?

标签 java apache-spark cassandra hdfs spark-cassandra-connector

我有一个可以处理大型数据集的 Java 程序。数据集存储在 hdfs (csv) 中。

程序运行良好,但速度很慢。

程序做什么:

  1. 载入csv文件
  2. String[] 单独一行
  3. 过滤字符串数组
  4. 映射到 MyObject
  5. 将 MyObject 保存到 Cassandra

这是我的主要方法:

public static void main(String[] args) {

        // configure spark
        SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
                .setMaster("local[*]")
                .set("spark.executor.memory", "4g");

        if (args.length > 1)
            sparkConf.set("spark.cassandra.connection.host", args[1]);

        // start a spark context
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // read text file to RDD
        JavaRDD<String> lines = sc.textFile(args[0]);

        JavaRDD<MyObject> myObjectJavaRDD = lines
                .map(line -> line.split(","))
                .filter(someFilter)
                .map(MyObject::new);

        javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
    }

我怎样才能提高性能?

感谢您的回答。

最佳答案

您的代码没有随机播放问题(除非您必须写出到 HDFS)并且默认分区由输入格式定义,在 Hadoop 上由 HDFS 核心分割,过滤器或映射不会更改分区。如果你能先过滤,你会看到一些改进

        JavaRDD<MyObject> myObjectJavaRDD = lines
                .filter(someFilter)
                .map(line -> line.split(","))
                .map(MyObject::new);

Spark 只能为 RDD 的每个分区运行 1 个并发任务,最多为 集群中的核心。所以如果你有一个有 50 个核心的集群,你希望你的 RDD 至少 有50个分区。至于选择“好”数量的分区,您通常至少需要与 并行执行器的数量。您可以通过调用获得此计算值

sc.defaultParallelism

或检查RDD分区数

someRDD.partitions.size

通过使用读取文件创建RDD时

rdd = SparkContext().textFile("hdfs://…/file.txt") 

分区数可能会少一些。理想情况下,你会得到相同的 您在 HDFS 中看到的 block 数,但是如果文件中的行太长(长于 block 大小),将会有更少的分区。

为RDD设置分区数的首选方法是直接将其作为 调用中的第二个输入参数

rdd = sc.textFile("hdfs://… /file.txt", 400) 

其中 400 是分区数。在这种情况下,分区会进行 400 次拆分 由 Hadoop 的 TextInputFormat 完成,而不是 Spark,它会工作得更快。它的 此外,代码会产生 400 个并发任务以尝试将 file.txt 直接加载到 400 分区。

重新分区:增加分区,过滤后重新平衡分区增加并行度

        repartition(numPartitions: Int)

合并:在输出到 HDFS/外部之前减少分区而不进行混洗合并

    coalesce(numPartitions: Int, suffle: Boolean = false)

最后,同样重要的是,您可以使用不同的值和基准进行一些试验,以查看该过程花费了多少时间

  val start = System.nanoTime()

  // my process

  val end = System.nanoTime()

  val time = end - start
  println(s"My App takes: $time")

希望对你有帮助

关于java - 如何提高 Spark 性能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61961625/

相关文章:

java - titan 图数据库中并发更新的问题

java - Datastax java 驱动程序 4.0 以编程方式配置

scala - Spark SQL + Cassandra : bad performance

java - 如何解决 "itemTag cannot be resolved or is not a field"?

java - 如何解决这个: "Error occurred during initialization of VM; Could not reserve enough space for object heap"

java - NoClassDefFound : Scala/xml/metadata

scala - 我如何从 pyspark 访问 couchbase

java - 如何从java移动应用程序上传更大的文件?

java - Spring ldap 身份验证失败错误代码

scala - 如何使用 Spark/Scala 中另一列的分隔符拆分一列