scala - 使用hadoop和spark在Azure上使用WordCount

标签 scala azure hadoop apache-spark azure-hdinsight

我必须在Azure的Hdinsight群集上运行一个简单的单词计数。我已经使用hadoop和spark创建了一个集群,并且我已经有了包含代码的jar文件,但我不知道如何设置集群以及在Azure上启动Spark的正确代码行的问题,我想尝试不同的节点组合(workers,2-4-8)来查看程序的扩展方式。

每次我在yarn-client模式下以spark-submit方式启动应用程序时,它都能正常工作,但始终使用2个执行器和1个核心在3分钟左右的时间内输入1gb输入文本文件,如果我设置了更多的执行器和更多的核心,他也会进行设置,他不使用它,所以我认为问题出在RDD上,它没有以正确的模式拆分输入文件,因为它仅创建了从2个工作节点开始的2个任务,而其他节点保持不 Activity 状态。

用sbt包创建的jar文件。

启动Spark的命令:

spark-submit --class "SimpleApp" --master yarn-client --num-executors 2 simpleapp_2.10-1.0.jar

字数代码:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import java.io._
import org.apache.hadoop.fs
import org.apache.spark.rdd.RDD


object SimpleApp {
def main(args: Array[String]){
//settingsparkcontext
val conf = new SparkConf().setAppName("SimpleApp")
val sc = new SparkContext(conf)
//settingthewordtosearch
val word = "word"
//settingtime
val now = System.nanoTime
//settingtheinputfile
val input = sc.textFile("wasb://xxx@storage.blob.core.windows.net/dizionario1gb.txt")

//wordlookup
val splittedLines = input.map(line=>line.split(""))
val find = System.nanoTime
val tot = splittedLines.map(x => x.equals(word)).count()
val w=(System.nanoTime-find)/1000000
val rw=(System.nanoTime-now)/1000000
//reportingtheresultofexecutioninatxtfile
val writer = new FileWriter("D:\\Users\\user\\Desktop\\File\\output.txt",true)
try {
writer.write("Word found "+tot+" time total "+rw+" mstimesearch "+w+" time read "+(rw-w)+"\n")
}
finally writer.close()
//terminatingthesparkserver
sc.stop()
}}              

最佳答案

并行度

“除非为每个操作设置足够高的并行度,否则群集将不会得到充分利用。Spark会根据文件的大小自动设置要在每个文件上运行的“映射”任务的数量(尽管您可以通过SparkContext的可选参数进行控制) .textFile等),您可以将并行级别作为第二个参数传递(请参见spark.PairRDDFunctions文档),或设置config属性spark.default.parallelism来更改默认值。通常,我们建议每个2-3个任务集群中的CPU核心。”

来源:
https://spark.apache.org/docs/1.3.1/tuning.html

关于scala - 使用hadoop和spark在Azure上使用WordCount,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33763251/

相关文章:

scala - 在scala中实现长轮询并用akka玩2.0

azure - 如何调试 log4net 不在 Azure 应用服务上写入

azure - 设置 Windows 服务总线 1.1 管理门户

hadoop - 是否可以使用Google的Dataflow运行Hadoop MR作业?

apache - Storm ui错误:org.apache.storm.utils.NimbusLeaderNotFoundException:从种子主机[localhost]中找不到领导者雨云

search - 使用 hadoop 进行日志搜索

Scala-GWT 状态

scala - 缺少序列化程序时,密封特征和对象枚举的快速 json4s 序列化

algorithm - Scala 代码中的性能问题 - O(nlgn) 比 O(n) 快

linux - 如何使用位于 azure webapp 中的 .sh 文件将多个文件从本地存储上传到 azure blob 存储