scala - 为什么使用 updateStateByKey 时任务大小一直在增长?

标签 scala memory-leaks apache-spark spark-streaming

我编写了一个与 updateStateByKey 一起使用的简单函数,以查看问题是否是因为我的 updateFunc。我认为这一定是由于其他原因。我在 --master local[4] 上运行它。

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  Some(1)
}

val state = test.updateStateByKey[Int](updateFunc)

一段时间后,有警告,任务大小不断增加。

WARN TaskSetManager:阶段 x 包含一个非常大的任务 (129 KB)。建议的最大任务大小为 100 KB。

WARN TaskSetManager:阶段 x 包含一个非常大的任务 (131 KB)。建议的最大任务大小为 100 KB。

最佳答案

您的流中有越来越多的不同键,每个键都会将 1 的新副本添加到您的状态。

当前 updateStateByKey 在每个批处理间隔中扫描每个键,即使该键没有数据。这会导致 updateStateByKey 的批处理时间随着状态中键的数量而增加,即使数据速率保持固定

有一个proposal to solve this .

关于scala - 为什么使用 updateStateByKey 时任务大小一直在增长?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26694235/

相关文章:

map 上的 Scala 类型不匹配错误

compiler-construction - 在这个 Scala 代码中 compare 与 compareTo 有何不同?

java - Android Admob Interstitial 内存泄漏

hadoop - Spark 在 hdfs 中只读

python - Spark DF pivot error : Method pivot([class java. lang.String, class java.lang.String]) 不存在

scala - 并行执行测试

scala - Spark UDF 线程安全

delphi - 使用动态数组时 Linux 版 RAD 服务器上的内存泄漏

java - sun.font.TrueTypeFont 内存泄漏?

apache-spark - spark 如何选择 cassandra 节点进行读取?