我有一堆输入流,可能会随着时间的推移发送更新。如果发生更新,我需要计算增量以便能够进一步处理它。简而言之:
Input: 10 -> State: 10, Output: 10
Input: 12 -> State: 12, Output: 2
Input: 5 -> State: 5, Output: -7
我读到stateful processing和 timely processing为了了解如何在 Apache Beam 应用程序中处理这种状态,但我不明白的是:
- 能否 100% 保证我的有状态 DoFn 不会并行处理具有相同 key 的项目?
- 我想确保当我的应用程序重新启动或失败时状态保持不变,以便我可以从正确的初始值开始。如何确保我的 DoFn 在关闭之前“清理”(保留到数据存储区)?
对于#2,我想知道这在使用全局窗口时是否可行:
public class Delta extends DoFn<KV<String, Integer>, Integer> {
@StateId("state")
private final StateSpec<ValueState<Integer>> stateSpec = StateSpecs.value();
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(ProcessContext context,
BoundedWindow window,
@StateId("state") ValueState<Integer> state,
@TimerId("timer") Timer myTimer) {
// Assign the timer to the end of the current window, which is a global window
// Not sure if this always triggers when the application stops...
myTimer.set(window.maxTimestamp());
int value = context.element().getValue();
int acc = getOrInitialize(state.read());
int delta = value - acc;
state.write(value);
context.output(delta);
}
@OnTimer("timer")
public void onTimer(OnTimerContext context,
@StateId("state") ValueState<Integer> state) {
// Persist value of state here
}
private int getOrInitialize(Integer a) {
// Get initial value of state here
return (a != null) ? a : 0;
}
}
最佳答案
- 是
- 我认为如果不配置任何
BoundedWindow
,您的计时器方法就无法工作。@StartBundle
/@Setup
和@FinishBundle
应该是恢复和检查点的更好位置。我不推荐@Teardown
因为它不能保证被调用。
关于java - 使用状态处理计算 Apache Beam 中的增量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51789776/