apache-spark - MapGroupsWithState 的 Spark 结构化流状态存储在哪里?

标签 apache-spark spark-structured-streaming

我知道状态作为状态存储保存在检查点位置。 但我不知道它还在内存中,存储在哪里?

我创建了一个使用mapGroupsWithState的Streaming作业,但我看到执行器使用的存储内存为0。

这是否意味着状态存储在执行内存中? 我无法知道状态消耗的内存量。不知道如何知道我是否需要增加执行器内存!

此外,是否可以完全避免状态检查点并将状态始终保留在内存中?

最佳答案

由于 mapGroupsWithState 是一个聚合,因此它将存储在 Spark 应用程序生命周期内所有聚合都保存的位置:在执行内存中(正如您已经假设的那样)。

查看方法的签名

def mapGroupsWithState[S: Encoder, U: Encoder](
      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] 

您会注意到S是用户定义状态的类型。这就是管理状态的地方。

由于这将被发送到执行器,因此它必须可编码为 Spark SQL 类型。因此,您通常会在 Scala 中使用案例类,或者在 Java 中使用 Bean。 GroupState 是一个类型化包装对象,它提供访问和管理状态值的方法。

作为开发人员,您还必须注意如何从此状态中删除数据,这一点至关重要。否则,你的状态将不可避免地导致 OOM,因为它只会增长而不会收缩。

如果您未在结构化流中启用检查点,则不会存储任何内容,但缺点是在失败期间会丢失状态。如果您启用了检查点,例如为了跟踪输入源,Spark 还将当前状态存储到检查点位置。

关于apache-spark - MapGroupsWithState 的 Spark 结构化流状态存储在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66369089/

相关文章:

apache-spark - Spark 结构化流中的多重聚合

apache-spark - Spark Structured Streaming 应用程序没有工作和阶段

scala - 无法解析带有流源的查询必须使用 writeStream.start() Scala 执行

scala - 为什么对不存在(未选择)的列进行过滤有效?

apache-spark - 使用HDFS在Spark Graphx中存储图形

scala - 斯帕克斯卡拉 : iterable to individual key-value pairs

apache-spark - 如何在Spark结构化流中将两个流df写入MySQL中的两个不同表中?

java - 如何根据给定分区过滤 RDD?

dataframe - 如何在 Spark Dataframe 中有效地添加多个列

apache-spark - 带有 Kafka SASL/PLAIN 身份验证的 Spark 结构化流