java - 通过与 kafka-streams 的连接批量处理数据导致 `Skipping record for expired segment`

标签 java apache-kafka apache-kafka-streams

通过 kafka-steams 应用推送批量数据时,我看到它多次记录以下消息......WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment....我希望通过 leftJoin 步骤加入的数据似乎丢失了。
当我的应用程序关闭一段时间然后重新启动时,或者当我使用类似 app-reset-tool 的东西时,我都在实践中看到过这种情况。试图让应用程序重新处理过去的数据。
我能够通过向间隔一小时的两个主题生成 1000 条消息(按原始时间戳顺序)生成 1000 条消息,然后让 kafka 流为它们选择一个 key 并尝试 leftJoin 重新生成两个重新生成 key 的流,从而能够单独重现这种行为。
该复制品的自包含源代码可在 https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java 获得。
那里的实际 kafka-streams 拓扑如下所示。

            final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, String> leftStream = builder.stream(leftTopic);
            final KStream<String, String> rightStream = builder.stream(rightTopic);

            final KStream<String, String> rekeyedLeftStream = leftStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            final KStream<String, String> rekeyedRightStream = rightStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

            final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
                    rekeyedRightStream,
                    (left, right) -> left + "/" + right,
                    joinWindow
            );
......我产生的最终输出看起来像这样......
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
...其中,给定输入数据,我希望看到每一行都以两个值连接结束,而不是正确的值为空。
(请注意,我们最初获得每个值的左/空值是好的/预期 - 这是我理解的 kafka-streams left join 的预期语义。)
我注意到,如果我在连接窗口上设置一个非常大的宽限值,问题就解决了,但是由于我提供的输入没有乱序,我没想到需要这样做,而且我厌倦了资源要求在具有大量容量的应用程序上在实践中这样做。
我怀疑发生了一些事情,当处理一个分区时,它会导致流时间被推送到该分区中的最新消息,这意味着当检查下一个分区时,发现它包含许多“太”的记录旧'与流时间相比。但是,我希望有人可以向我指出更改该行为的设置,或其他一些解决方案,以避免在应用程序通过积压数据追赶时产生不准确的结果,而不会造成很大的性能开销。

最佳答案

你是发布消息空间 1 小时明显,然后它无法加入?
现在您正在使用:

JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
将其更改为更高的数字或增加宽限期将允许您处理更多的消息,消息相隔 1 小时并且您有 1000 条消息,因此值:
JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)).grace(Duration.ofDays(42));
需要。 (因为 1000 * 5h 接近 42 天)
因此,您需要根据数据大小调整此值,以便始终能够对您期望的所有消息执行此操作。
然后我得到你期望的结果......我想是的。因为我不完全确定这里的其他空值,但你似乎说它是预期的 - 没有分析那部分。因为有些人确实拥有它,而其他人则没有。

 11 [11:left/null, 11:left/11:right]
 12 [12:left/12:right]
 13 [13:left/null, 13:left/13:right]
 14 [14:left/null, 14:left/14:right]
 15 [15:left/null, 15:left/15:right]
 16 [16:left/null, 16:left/16:right]
 17 [17:left/17:right]
 18 [18:left/null, 18:left/18:right]
 19 [19:left/null, 19:left/19:right]
 20 [20:left/null, 20:left/20:right]
 21 [21:left/null, 21:left/21:right]
 22 [22:left/null, 22:left/22:right]
 23 [23:left/null, 23:left/23:right]
 24 [24:left/null, 24:left/24:right]
 25 [25:left/25:right]
 26 [26:left/null, 26:left/26:right]
但是对于所有 1000 个结果总是有有效的配对预设。
您需要旧数据,因此您必须同意旧数据。

but as I understand it having a very large grace period would be costly


如果您的宽限期比您需要的大得多,这可能会很昂贵,但在这种情况下,它正是您需要的。除非你可以完全避免这样做。
正如您在文档中所看到的,grace 完全符合您的要求(或者更确切地说是您不想要的,其默认值较低):
https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#grace-java.time.Duration-

Reject late events that arrive more than afterWindowEnd after the end of its window. Lateness is defined as (stream_time - record_timestamp).


替代方案是使用更大的窗口,但这对于您的情况来说似乎不是正确的解决方案:
JoinWindows joinWindow = JoinWindows.of(Duration.ofDays(42);

关于java - 通过与 kafka-streams 的连接批量处理数据导致 `Skipping record for expired segment`,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69126351/

相关文章:

spring-boot - KafkaContainer - 如何在 start() 之后在 Spring Boot 中读取 kafka 容器端口作为属性/如何在启动之前配置 Kafka 端口

java - 如何使用 KStreams 将 Kafka 主题的数据写入文件?

java - 我在 TicTacToe 游戏中设置 'piece' 的方法不起作用

java - 正则表达式从组中删除空格

java - 为什么Kafka的seekToBeginning和seekToEnd不能与assign一起使用?

java - 如何在读取 kafka 主题时验证无效的分区名称

apache-kafka - 滑动窗口中Kafka KStream相关消息事件

apache-kafka - 如何修复与组协调器相关的 kafka 流问题不可用或无效,将尝试重新发现

java - Android 复制并粘贴到任何应用程序中的任何文本字段

java - 使用 guava Function 接口(interface) vs 引入新类型