apache-kafka - Apache 卡夫卡流: Out-of-Order messages

标签 apache-kafka timestamp extractor

我有一个 Apache Kafka 2.6 Producer,它写入主题 A (TA)。 我还有一个 Kafka 流应用程序,它从 TA 进行消费并写入主题 B (TB)。 在流应用程序中,我有一个自定义时间戳提取器,它从消息负载中提取时间戳。

对于我的一个故障处理测试用例,我在应用程序运行时关闭了 Kafka 集群。

当生产者应用程序尝试将消息写入 TA 时,它不能,因为集群已关闭,因此(我假设)缓冲了消息。 假设它按时间递增顺序接收 4 条消息 m1、m2、m3、m4。 (即 m1 是第一个,m4 是最后一个)。

当我使 Kafka 集群重新上线时,生产者将缓冲的消息发送到主题,但它们不按顺序排列。例如,我收到 m2 然后 m3 然后 m1 然后 m4。

这是为什么呢?是不是因为生产者中的缓冲是多线程的,每个生产者同时生产到主题?

我认为自定义时间戳提取器将有助于在使用消息时对消息进行排序。但他们没有。或者也许我对时间戳提取器的理解是错误的。

我从 SO here 得到了一个解决方案,将所有事件从 tA 流式传输到另一个中间主题(例如 tA'),该中间主题将使用时间戳提取器发送到另一个主题。但我不确定这是否会导致事件根据提取的时间戳重新排序。

我的生产者代码如下所示(我使用 Spring Cloud 来创建生产者): Producer.java

@Service
public class Producer {

    private String topicName = "input-topic";
        
    private ApplicationProperties appProps;
    
    @Autowired
    private KafkaTemplate<String, MyEvent> kafkaTemplate;
    
    public Producer() {
        super();        
    }
    
    @Autowired
    public void setAppProps(ApplicationProperties appProps) {
        this.appProps = appProps;
        this.topicName = appProps.getInput().getTopicName();
    }

    public void sendMessage(String key, MyEvent ce) {
        ListenableFuture<SendResult<String,MyEvent>> future = this.kafkaTemplate.send(this.topicName, key, ce); 
        
    }
}

最佳答案

Why is that ? Is it because the buffering in the producer is multi-threaded with each producing to the topic at the same time ?

默认情况下,生产者允许向代理发送最多 5 个并行的正在进行的请求,因此,如果某些请求失败并重试,则请求顺序可能会更改。

要避免此重新排序问题,您可以设置 max.in.flight.requests.per.connection = 1 (可能会影响性能)或设置 enable.idempotence = true .

顺便说一句:您没有说您的主题是单个分区还是多个分区,以及您的消息是否有 key ?如果您的主题有多个分区,并且您的消息被发送到不同的分区,则无论如何都无法保证读取的顺序,因为偏移量顺序仅在分区内得到保证。

I assumed that the custom timestamp extractor would help in ordering messages when consuming them. But they do not. Or maybe my understanding of the timestamp extractor is wrong.

时间戳提取器仅提取时间戳。 Kafka Streams 不会重新排序任何消息,但始终按偏移顺序处理消息。

If not, then what are the specific uses of the timestamp extractor ? Just to associate a timestamp with an event ?

正确。

I got one solution from SO here, to just stream all events from tA to another intermediate topic (say tA') which will use the TimeStamp extractor to another topic. But I am not sure if this will cause the events to get reordered based on the extracted timestamp.

不,它不会进行任何重新排序。另一个SO问题只是要更改时间戳,但是如果您按顺序a,b,c读取消息,结果将按顺序a,b,c写入(只是使用不同的时间戳,但应保留偏移顺序)。

本次演讲解释了更多细节:https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/

关于apache-kafka - Apache 卡夫卡流: Out-of-Order messages,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67158317/

相关文章:

oracle - Oracle 数据库的 NHibernate <timestamp> 映射导致 StaleStateException

Java 元数据提取器导致 java.lang.NoClassDefFoundError

scala - 是否可以在case语句的正文中(或在提取器将要使用的其他任何地方)使用参数自定义提取器?

spring - kafka消费者中死信队列的好选择是什么

apache-kafka - Apache Kafka 分区保证中的消息顺序

apache-kafka - Storm Trident 拓扑与 Kafka : Received unexpected tuple error

c# - 十进制时间跨度年份和月份 c#

python - 使用 ssl 访问 kafka 代理时出错

java - 在将 Java 日期写入 SQL TIMESTAMP 列之前,JDBC 是否会将日期从 JVM 时区转换为数据库 session 时区?

json - JMeter JSON Extractor,提取字符串中一个键的所有值