我正在使用 DirectAPI 在 yarn 上运行 Spark 流 (1.6.1) 以从具有 50 个分区的 Kafka 主题读取事件并在 HDFS 上写入。我的批处理间隔为 60 秒。我收到了大约 50 万条消息,这些消息在 60 秒内得到处理。
突然,spark 开始接收 15-2000 万条消息,处理时间大约为 5-6 分钟,批处理间隔为 60 秒。我已经配置了 "spark.streaming.concurrentJobs=4"
。
因此,当批处理需要很长时间来处理时,spark 启动并发 4 个事件任务来处理积压批处理,但仍然在一段时间内批处理积压增加,因为批处理间隔对于这样的数据量来说太小了。
我对此几乎没有怀疑。
'saveAsTextFile'
Action 每批只调用一次。来自所有文件 50 个部分文件的总记录约为 330 万。 我已经配置了
'spark.streaming.kafka.maxRatePerPartition=50'
& 'spark.streaming.backpressure.enabled=true'
。
最佳答案
我认为可能让您感到困惑的一件事是工作长度与频率之间的关系。
根据您的描述,在可用资源的情况下,最终这项工作似乎需要大约 5 分钟才能完成。但是,您的批处理频率为 1 分钟。
因此,每 1 分钟您就会启动一些需要 5 分钟才能完成的批次。
结果,最后您会期望看到 HDFS 在最初的几分钟内什么也没有收到,然后您每隔 1 分钟就会收到一些信息(但是从数据输入开始有 5 分钟的“延迟”)。
关于hadoop - Spark 流 : Issues when processing time > batch time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41184154/