我知道状态作为状态存储保存在检查点位置。 但我不知道它还在内存中,存储在哪里?
我创建了一个使用mapGroupsWithState的Streaming作业,但我看到执行器使用的存储内存为0。
这是否意味着状态存储在执行内存中? 我无法知道状态消耗的内存量。不知道如何知道我是否需要增加执行器内存!
此外,是否可以完全避免状态检查点并将状态始终保留在内存中?
最佳答案
由于 mapGroupsWithState 是一个聚合,因此它将存储在 Spark 应用程序生命周期内所有聚合都保存的位置:在执行内存中(正如您已经假设的那样)。
查看方法的签名
def mapGroupsWithState[S: Encoder, U: Encoder](
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
您会注意到S
是用户定义状态的类型。这就是管理状态的地方。
由于这将被发送到执行器,因此它必须可编码为 Spark SQL 类型。因此,您通常会在 Scala 中使用案例类,或者在 Java 中使用 Bean。 GroupState
是一个类型化包装对象,它提供访问和管理状态值的方法。
作为开发人员,您还必须注意如何从此状态中删除数据,这一点至关重要。否则,你的状态将不可避免地导致 OOM,因为它只会增长而不会收缩。
如果您未在结构化流中启用检查点,则不会存储任何内容,但缺点是在失败期间会丢失状态。如果您启用了检查点,例如为了跟踪输入源,Spark 还将当前状态存储到检查点位置。
关于apache-spark - MapGroupsWithState 的 Spark 结构化流状态存储在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66369089/