假想问题
我的Poorman 的概念解决方案
使用 SparkSQL & Databricks spark-csv
$ ./spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
val dfBigLog = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("/media/username/myUSBdrive/bogusBigLog1TB.log")
dfBigLog.select("Country")
.groupBy("Country")
.agg(count($"Country") as "CountryCount")
.orderBy($"CountryCount".desc).show
问题一:Spark如何并行处理?
我想上述解决方案的大部分执行时间(99%?)是将 1TB 文件从 USB 驱动器读取到 Spark 集群中。从 USB 驱动器读取文件不可并行化。但是在读取了整个文件之后,Spark 在后台做了什么来并行化处理?
groupBy
& count
?假设有 100 多个国家/地区(但 Spark 还不知道)。 Spark 如何分区以在 20 个节点上分配 100 多个国家/地区值? 问题 2:如何使 Spark 应用程序尽可能快?
我想改进的领域是并行读取 1TB 文件。
但是 Spark 如何设法使用所有 20 个节点来创建 DataFrame 和处理(
groupBy
和 count
)? Spark 是否每次都使用所有节点?
最佳答案
Question 1: How does Spark parallelize the processing (of reading a file from a USB drive)?
这种情况是不可能的。
Spark 依赖于符合 hadoop 的文件系统来读取文件。挂载U盘时,只能从本地主机访问。试图执行
.load("/media/username/myUSBdrive/bogusBigLog1TB.log")
将在集群配置中失败,因为集群中的执行程序将无法访问该本地路径。
可以在本地模式 (
master=local[*]
) 下使用 Spark 读取文件,在这种情况下,您将只有 1 个主机,因此其余问题将不适用。Question 2: How to make the Spark application the fastest possible?
分而治之。
问题中概述的策略很好。使用 Parquet 将允许 Spark 对数据进行投影,并且只有
.select("Country")
列,进一步减少需要摄取的数据量,从而加快速度。Spark 并行性的基石是分区。同样,当我们从文件中读取数据时,Spark 依赖于 Hadoop 文件系统。从 HDFS 读取时,分区将由 HDFS 上的文件拆分决定。这些分割将在执行者之间平均分配。这就是 Spark 最初将工作分配给作业的所有可用执行器的方式。
我对 Catalist 优化不是很熟悉,但我想我可以假设
.groupBy("Country").agg(count($"Country")
将变成类似于:rdd.map(country => (country,1)).reduceByKey(_+_)
map操作不会影响分区,可以现场应用。reduceByKey 将首先在每个分区本地应用,部分结果将在驱动程序上组合。所以大多数计数发生在集群中,并且将其加起来是集中的。
关于apache-spark - Spark 如何并行处理 1TB 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36522417/