java - 事务性生产者与幂等生产者 Java(异常 OutOfOrderSequenceException)

标签 java apache-kafka kafka-producer-api spring-kafka

我使用具有幂等生产者配置的 spring-kafka:

这些是我的配置 Prop :

   Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers()));
    //configure the following three settings for SSL Encryption
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation());
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  appProps.getJksPassword());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
    props.put(ProducerConfig.RETRIES_CONFIG, 5);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

我的 kafka 生产者抛出 OutOfOrderSequenceException:

2019-03-06 21:25:47 Sender [ERROR] [Producer clientId=producer-1] The broker returned org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number for topic-partition topic-1 at offset -1. This indicates data loss on the broker, and should be investigated. 2019-03-06 21:25:47 TransactionManager [INFO] [Producer clientId=producer-1] ProducerId set to -1 with epoch -1 2019-03-06 21:25:47 ProducerKafka [ERROR] we encountered error while sending to kafka, please retry the job

我不确定为什么会抛出这个异常。我找不到具体的答案。该异常的官方 javadoc 说明如下:

This exception indicates that the broker received an unexpected sequence number from the producer, which means that data may have been lost. If the producer is configured for idempotence only (i.e. if enable.idempotence is set and no transactional.id is configured), it is possible to continue sending with the same producer instance, but doing so risks reordering of sent records. For transactional producers, this is a fatal error and you should close the producer.

这是否意味着我需要使用事务生产者来避免这个问题?

KafkaProducer 文档声明了一些使上述声明含糊不清的内容:https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will be defaulted to Integer.MAX_VALUE, the max.in.flight.requests.per.connection config will be defaulted to 1, and acks config will be defaulted to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.

To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.

上面的声明清楚地表明,对于一个幂等生产者,我所需要的只是使用 enable.idempotence 属性。但是,异常指出我必须使用那个 transactional.id 属性。

创建幂等异步生产者而不必处理致命的 OutOfOrderSequenceException 的正确方法是什么。

最佳答案

如果您明确设置了重试,那么您必须设置

max.in.flight.requests.per.connection=1

为了避免乱序问题

文档非常清楚地指出:

设置一个大于零的值将导致客户端重新发送发送失败并可能出现暂时性错误的任何记录。请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。 在不将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 1 的情况下允许重试可能会更改记录的顺序,因为如果将两个批处理发送到单个分区,并且第一个失败并重试但第二个成功,则第二个批处理中的记录可能首先出现.

关于java - 事务性生产者与幂等生产者 Java(异常 OutOfOrderSequenceException),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55192852/

相关文章:

java - 为 Android 应用推出用户身份验证系统

java.sql.SQLException : ORA-00933: SQL command not properly ended Error with LIKE Sql Query

spring-boot - kafka SASL_PLAIN SCRAM 在 Spring Boot Consumer 中失败

apache-kafka - 在 Kafka Consumer 配置中减少 max.poll.records 的影响

java - 批量发布Kafka对象

Java 模型映射器 : map DTO to EmbeddedId entity class

java - 如何使用 javafx 根据来自另一个组合框的单击值设置组合框的值

java - 如何配置 ConfluenceRegistry 以使用不同的 Avro 架构源?

java - Kafka 不会消耗所有产生的数据

java - 哪些异常会传播到 Kafka API 中的客户端应用程序代码