java - Kafka - 停止重试 ConnectException

标签 java apache-kafka

我有一个像这样定义的kafka制作人

public KafkaMessageProducer(String kafkaHost, String kafkaPort, Map<String, String> map) {
        this.kafkaTopic = map;
        final Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);
        producer = new KafkaProducer<String, String>(properties);
    }

我正在使用以下代码发送消息。 (也尝试使用回调)。

public void sendMessage(String topic, RestCommonResource resultToken) {

        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode  jsonNode = objectMapper.valueToTree(resultToken);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, jsonNode.toString());
        producer.send(record);

    }

但是如果 kafka 服务器宕机并且生产者发布消息,程序将陷入无限循环并出现以下异常:

WARN  [2018-09-13 06:27:59,589] org.apache.kafka.common.network.Selector: Error in I/O with localhost/127.0.0.1
! java.net.ConnectException: Connection refused: no further information
! at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_80]
! at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) ~[na:1.7.0_80]
! at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
! at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]

如果有任何属性可以设置为停止重试并丢弃消息。

最佳答案

Currently if a Kafka client loses a connection with brokers it will wait for reconnect.backoff.ms milliseconds before attempting to reconnect.

While this strategy works well when a client is disconnected for a short time if a single broker or the entire cluster become unavailable for a long time all clients will quickly generate a lot of connections.

In addition, developers have limited control over a client which constantly loses its connections with the cluster.

我认为这个主题对您有用:Add custom policies for reconnect attempts to NetworkdClient

reconnect.backoff.ms : The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

reconnect.backoff.max.ms : The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

关于java - Kafka - 停止重试 ConnectException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52307867/

相关文章:

java - Tapestry 4,从非组件元素获取提交的值

apache-kafka - 如何查询 ksql 中的 map 字段?

apache-kafka - 当 ISR 关闭但消息写入领导者时会发生什么

java - 为什么要在 Apache Kafka 中序列化/反序列化键和值?

apache-kafka - 针对同一集群的多个 kafka 架构注册表

java - 尝试将 JSON 字符串发送到 Android 客户端时出现 500 内部服务器错误 GAE?

java - 在 Java 中检查二维数组中邻居的更有效方法

java - 为什么代码执行父类(super class)的方法而不是子类的方法?

java - 定时打印线

java - 如何连接两个 Kafka 流,每个流都有多个分区?