java - 使用状态处理计算 Apache Beam 中的增量

标签 java apache-beam

我有一堆输入流,可能会随着时间的推移发送更新。如果发生更新,我需要计算增量以便能够进一步处理它。简而言之:

Input: 10 -> State: 10, Output: 10
Input: 12 -> State: 12, Output:  2
Input:  5 -> State:  5, Output: -7

我读到stateful processingtimely processing为了了解如何在 Apache Beam 应用程序中处理这种状态,但我不明白的是:

  1. 能否 100% 保证我的有状态 DoFn 不会并行处理具有相同 key 的项目?
  2. 我想确保当我的应用程序重新启动或失败时状态保持不变,以便我可以从正确的初始值开始。如何确保我的 DoFn 在关闭之前“清理”(保留到数据存储区)?

对于#2,我想知道这在使用全局窗口时是否可行:

public class Delta extends DoFn<KV<String, Integer>, Integer> {
    @StateId("state")
    private final StateSpec<ValueState<Integer>> stateSpec = StateSpecs.value();

    @TimerId("timer")
    private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void process(ProcessContext context,
                        BoundedWindow window,
                        @StateId("state") ValueState<Integer> state,
                        @TimerId("timer") Timer myTimer) {
        // Assign the timer to the end of the current window, which is a global window
        // Not sure if this always triggers when the application stops...
        myTimer.set(window.maxTimestamp());

        int value = context.element().getValue();
        int acc = getOrInitialize(state.read());
        int delta = value - acc;
        state.write(value);
        context.output(delta);
    }

    @OnTimer("timer")
    public void onTimer(OnTimerContext context,
                        @StateId("state") ValueState<Integer> state) {
        // Persist value of state here
    }

    private int getOrInitialize(Integer a) {
        // Get initial value of state here
        return (a != null) ? a : 0;
    }
}

最佳答案

  1. 我认为如果不配置任何 BoundedWindow,您的计时器方法就无法工作。 @StartBundle/@Setup@FinishBundle 应该是恢复和检查点的更好位置。我不推荐 @Teardown 因为它不能保证被调用。

关于java - 使用状态处理计算 Apache Beam 中的增量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51789776/

相关文章:

java - 映射代码以过滤掉日志文件中每一行的某些字段

java - Jenkins 中带有 Sonar 的 JaCoCo 代码覆盖率引擎

java - 从 ElasticsearchIO 等待 [10000] 毫秒后获取监听器超时

python - Apache Beam 中的并行度

java - 将外部类与数据流结合使用

java - gradle 使用自己的依赖项而不是 commons-cli 的 build.gradle

java - stripe.confirmCardPayment 和 java

java - 寻求服务定位器模式实现方面的帮助

python - 如何在 python Beam 中制作通用的 Protobuf 解析器 DoFn?

java - Apache Beam/Dataflow ReShuffle 已弃用,该使用什么替代?