hadoop - 如何在Spark流中运行并发事件作业以及执行者之间的公平任务调度

标签 hadoop apache-spark spark-streaming yarn

我在 yarn 上使用Spark Streaming,面临以下问题。

问题1:

我在 yarn 上使用 Spark 流(1.6.1),我总是看到 Activity 作业计数为1,这意味着一次仅运行1个作业。我使用了“--conf spark. streaming. concurrentJobs=3”参数,但是运气不好,我总是只能看到1个 Activity 作业。

enter image description here

问题2:

我有50个Kafka分区,星火流创建了50个RDD分区,但是我可以看到95%的任务只分配给1个执行器,其余的执行器通常始终具有零 Activity 任务。

enter image description here

我的Spark提交命令如下:

spark-submit \
--verbose \
--master yarn-cluster \
--num-executors 3  \
--executor-memory 7g \
--executor-cores 3 \
--conf spark.driver.memory=1024m  \
--conf spark.streaming.backpressure.enabled=false \
--conf spark.streaming.kafka.maxRatePerPartition=3 \
--conf spark.streaming.concurrentJobs=3 \
--conf spark.speculation=true \
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
--files kafka_jaas.conf#kafka_jaas.conf,user.headless.keytab#user.headless.keytab \
--driver-java-options "-Djava.security.auth.login.config=./kafka_jaas.conf -Dhttp.proxyHost=PROXY_IP -Dhttp.proxyPort=8080 -Dhttps.proxyHost=PROXY_IP -Dhttps.proxyPort=8080 -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-driver.properties" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-executor.properties" \
--class com.spark.demo.StreamProcessor /home/user/demo.jar /tmp/data/out 30 KAFKA_BROKER:6667 "groupid" topic_name

最佳答案

--conf spark.streaming.kafka.maxRatePerPartition = 3

还有为什么每个分区的最大速率这么低?这意味着每个分区每秒仅处理3条记录!!!因此,如果微批处理间隔为30秒,并说您有3个分区,它将处理30 * 3 * 3,这是270条记录,似乎很低。

关于hadoop - 如何在Spark流中运行并发事件作业以及执行者之间的公平任务调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41034779/

相关文章:

hadoop - pig 过滤器不工作

java - 在 Hadoop Java API 中指定用户名和组?

hadoop - 了解 Hadoop 快照功能

serialization - 无法使用 spark streaming、cassandra 和 mllib 序列化的任务

elasticsearch - 从Kafka到Spark的流式传输到 Elasticsearch 索引

hadoop - 如何找到安装了hadoop的节点总数

apache-spark - Hadoop在Spark中的等效配置

performance - DStream 的分区(用于 updateStateByKey() )如何工作以及如何验证它?

apache-spark - Pyspark 读取包含 csv 的 7z 压缩文件

apache-spark - 将 mqtt 与 pyspark 流结合使用