在我的管道设置中,我看不到 session 窗口的侧面输出。我使用的是 Flink 1.9.1
版本 1。 我所拥有的是这样的:
messageStream.
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
lateTradeMessages 实现 SessionWindowTimeGapExtractor 并返回 5 秒。
此外我还有这个:
messageStream.getSideOutput(lateTradeMessages)
.keyBy(tradeKeySelector)
.process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
@Override
public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
System.out.println("Process Late messages For Aggregation");
out.collect(new Transaction());
}
})
.name("Process Late messages For Aggregation");
问题是,当我使用相同的 key 发送应该错过窗口时间的消息时,我从未看到“处理延迟消息以进行聚合”。
当 session 窗口过去并且我“立即”为同一 key 发送一条新消息时,它会触发新的 session 窗口,而不会进入 Late SideOutput。
不确定我在这里做错了什么。
我想在这里实现的是捕捉“迟到的事件”并尝试 重新处理它们。
我将不胜感激任何帮助。
<小时/>版本 2,@Dominik Wosiński 评论后:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
DataStream<RawMessage> rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false), properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");
messageStream
.keyBy(tradeKeySelector)
.window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
水印正在进行中,我已经检查了 Flink 的指标。 Window 运算符正在执行,但仍然没有 Late Outputs。
顺便说一句,Kafka 主题可能闲置,所以我必须定期发出新的 WaterMark。
<小时/>最佳答案
水印方法对我来说看起来非常可疑。通常,您会在此时输出最新的事件时间戳。
只是一些背景信息,以便更容易理解。
延迟事件是指在水印处理之后到该事件之后的某个时间发生的事件。考虑以下示例:
event1 @time 1
event2 @time 2
watermark1 @time 3
event3 @time 1 <-- late event
event4 @time 4
您的水印方法几乎会将所有过去的事件渲染为迟到的事件(由于 1 秒水印间隔,所以有一点容忍度)。这也将使重新处理和追赶变得不可能。
但是,您实际上没有看到任何迟到的事件,这对我来说更令人惊讶。您能否仔细检查您的水印方法,描述您的用例并提供示例数据?很多时候,实现对于实际用例并不理想,应该以不同的方式解决。
关于java - Flink session 窗口的延迟输出缺失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59570445/