java - Flink session 窗口的延迟输出缺失

标签 java stream apache-flink flink-streaming

在我的管道设置中,我看不到 session 窗口的侧面输出。我使用的是 Flink 1.9.1

版本 1。 我所拥有的是这样的:

messageStream.
    .keyBy(tradeKeySelector)
    .window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
    .sideOutputLateData(lateTradeMessages)
    .process(new CumulativeTransactionOperator())
    .name("Aggregate Transaction Builder");

lateTradeMessages 实现 SessionWindowTimeGapExtractor 并返回 5 秒。

此外我还有这个:

messageStream.getSideOutput(lateTradeMessages)
  .keyBy(tradeKeySelector)
  .process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
     @Override
     public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
                   System.out.println("Process Late messages For Aggregation");
                   out.collect(new Transaction());
              }
       })
   .name("Process Late messages For Aggregation");

问题是,当我使用相同的 key 发送应该错过窗口时间的消息时,我从未看到“处理延迟消息以进行聚合”。

当 session 窗口过去并且我“立即”为同一 key 发送一条新消息时,它会触发新的 session 窗口,而不会进入 Late SideOutput。

不确定我在这里做错了什么。

我想在这里实现的是捕捉“迟到的事件”并尝试 重新处理它们。

我将不胜感激任何帮助。

<小时/>

版本 2,@Dominik Wosiński 评论后:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);


DataStream<RawMessage> rawBusinessTransaction = env
                .addSource(new FlinkKafkaConsumer<>("business",
                        new JSONKeyValueDeserializationSchema(false), properties))
                .map(new KafkaTransactionObjectMapOperator())
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(System.currentTimeMillis());
                    }

                    @Override
                    public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
                        return element.messageCreationTime;
                    }
                })
                .name("Kafka Transaction Raw Data Source.");

messageStream
             .keyBy(tradeKeySelector)
             .window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
             .sideOutputLateData(lateTradeMessages)
             .process(new CumulativeTransactionOperator())
             .name("Aggregate Transaction Builder");

水印正在进行中,我已经检查了 Flink 的指标。 Window 运算符正在执行,但仍然没有 Late Outputs。

顺便说一句,Kafka 主题可能闲置,所以我必须定期发出新的 WaterMark。

<小时/>

最佳答案

水印方法对我来说看起来非常可疑。通常,您会在此时输出最新的事件时间戳。

只是一些背景信息,以便更容易理解。

延迟事件是指在水印处理之后到该事件之后的某个时间发生的事件。考虑以下示例:

event1 @time 1
event2 @time 2
watermark1 @time 3
event3 @time 1 <-- late event
event4 @time 4

您的水印方法几乎会将所有过去的事件渲染为迟到的事件(由于 1 秒水印间隔,所以有一点容忍度)。这也将使重新处理和追赶变得不可能。

但是,您实际上没有看到任何迟到的事件,这对我来说更令人惊讶。您能否仔细检查您的水印方法,描述您的用例并提供示例数据?很多时候,实现对于实际用例并不理想,应该以不同的方式解决。

关于java - Flink session 窗口的延迟输出缺失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59570445/

相关文章:

java - JSF : Elements of ArrayList are not rendered properly?

java - 为什么在java中的Float数据类型中存储一个本质上是整数的文字值,最后没有 'f'不会给出错误?

c++ - 我需要一种流式传输字符串的替代方法。 C++

java - Flink JobExecutionException : akka. client.timeout

apache-flink - 如何在运行时配置 flink 作业?

java - 当您不知道它们叫什么时在 Struts Action 中获取请求参数

Java 正则表达式 unicode 支持?

IStream的C#IStream实现

Java:FilterInputStream相比其他流有什么优势和用途

apache-flink - Hazelcast Jet 和 Apache Flink 有什么区别