Apache Beam 最近推出了 state cells ,通过 StateSpec
和 @StateId
注释,部分支持 Apache Flink 和 Google Cloud Dataflow。
我找不到任何文档来说明当它与 GlobalWindow
一起使用时会发生什么。特别是,有没有一种方法可以拥有“状态垃圾收集”机制来根据某些配置摆脱一段时间未见的键的状态,同时仍然保持可见的键的单一历史状态够频繁吗?
或者,在这种情况下使用的状态量是否会发生分歧,无法回收与一段时间未见的键对应的状态?
我还对 Apache Flink 或 Google Cloud Dataflow 是否支持潜在解决方案感兴趣。
Flink 和 direct runners 似乎有一些“状态 GC”的代码,但我不太确定它的作用以及它在使用全局窗口时是否相关。
最佳答案
状态可以在窗口过期后的某个时刻由 Beam runner 自动垃圾收集 - 当输入水印超出窗口末尾允许的延迟时,因此所有进一步的输入都是可丢弃的。具体细节取决于运行者。
正如您正确确定的那样,全局窗口可能永远不会过期。那么这个状态的自动收集就不会被调用。对于有界数据,包括 drain 场景,它实际上会过期,但对于永久无界数据源,它不会。
如果您在全局窗口中对此类数据进行状态处理,您可以使用用户定义的计时器(通过 @TimerId
、@OnTimer
和 使用TimerSpec
- 我还没有写过关于这些的博客)以在您选择的超时后清除状态。如果状态代表某种聚合,那么无论如何您都需要一个计时器来确保您的数据不会滞留在状态中。
这是一个简单的使用示例:
new DoFn<Foo, Baz>() {
private static final String MY_TIMER = "my-timer";
private static final String MY_STATE = "my-state";
@StateId(MY_STATE)
private final StateSpec<ValueState<Bizzle>> =
StateSpec.value(Bizzle.coder());
@TimerId(MY_TIMER)
private final TimerSpec myTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext c,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState,
@TimerId(MY_TIMER) Timer myTimer) {
bizzleState.write(...);
myTimer.setForNowPlus(...);
}
@OnTimer(MY_TIMER)
public void onMyTimer(
OnTimerContext context,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
context.output(... bizzleState.read() ...);
bizzleState.clear();
}
}
关于google-cloud-dataflow - 带有 GlobalWindow 的 Beam 中的状态垃圾收集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42773020/