java - 集群关闭时 KafkaPublisher 会阻塞

标签 java apache-kafka

我正在使用kafka-clients-0.8.2.1.jar,发现当集群根本没有启动时,发送会阻塞。我在网上找到了更多相关信息Kafka 0.8.2 new producer blocking on metadata并发现这是许多人面临的已知问题。 但是我尝试了那里提到的一些选项但没有帮助: 1.调用KafkaProducer.partitionsFor(),仍然阻塞。 2. 将 Producer.type 设置为 async 但无法识别。

我想要实现的是检测集群已关闭,将消息排队(有最大限制)并在集群恢复时发送。如果这太复杂,至少它不会阻塞,因为它会导致应用程序将所有消息排队并耗尽内存。

代码:

System.out.println("props:"
producer = new KafkaProducer<String, String>(props);


producer.partitionsFor(record.topic());
producer.send(record, new Callback() {

日志:

props:{queue.size=1000, reconnect.backoff.ms=10000, request.timeout.ms=1000, bootstrap.servers=tstaapp001:59092,ewdlxsrv283:59092,devcapp001:59092, value.serializer=org.apache.kafka.common.serialization.StringSerializer, request.required.acks=1, buffer.memory=33554432, retries=0, producer.type=async, key.serializer=org.apache.kafka.common.serialization.StringSerializer, linger.ms=1, topic.metadata.refresh.interval.ms=1000, batch.size=16384, timeout.ms=10000}
20:44:33.584 [RTEMPool-1-RTEMThread-4] DEBUG o.a.k.c.producer.internals.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(tstaapp001, 59092), Node(devcapp001, 59092), Node(ewdlxsrv283, 59092)], partitions = [])
20:44:33.603 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread.
20:44:33.603 [RTEMPool-1-RTEMThread-4] WARN  o.a.k.c.producer.ProducerConfig - The configuration topic.metadata.refresh.interval.ms = null was supplied but isn't a known config.
20:44:33.603 [RTEMPool-1-RTEMThread-4] WARN  o.a.k.c.producer.ProducerConfig - The configuration request.timeout.ms = null was supplied but isn't a known config.
20:44:33.603 [RTEMPool-1-RTEMThread-4] WARN  o.a.k.c.producer.ProducerConfig - The configuration producer.type = null was supplied but isn't a known config.
20:44:33.604 [RTEMPool-1-RTEMThread-4] WARN  o.a.k.c.producer.ProducerConfig - The configuration request.required.acks = null was supplied but isn't a known config.
20:44:33.604 [RTEMPool-1-RTEMThread-4] WARN  o.a.k.c.producer.ProducerConfig - The configuration queue.size = null was supplied but isn't a known config.
20:44:33.604 [RTEMPool-1-RTEMThread-4] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka producer started
20:44:42.958 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:42.959 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Init connection to node -2 for sending metadata request in the next iteration
20:44:42.959 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -2 at ewdlxsrv283:59092.
20:44:42.964 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.063 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.163 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.264 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.364 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.464 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.564 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.664 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.764 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.864 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:43.964 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:44.064 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:44.164 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:44.264 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:44.364 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:44.464 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -2
20:44:44.479 [kafka-producer-network-thread | producer-1] WARN  o.a.kafka.common.network.Selector - Error in I/O with null
java.net.ConnectException: Connection refused: no further information

最佳答案

新的生产者 API 将在其内部缓冲区已满时阻塞(这没有详细记录)。您可以将属性 block.on.buffer.full 添加到 false,然后您将得到 BufferExhaustedException

参见http://kafka.apache.org/082/documentation.html#producerapi

关于java - 集群关闭时 KafkaPublisher 会阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37849749/

相关文章:

apache-kafka - Kafka 保留政策未按预期工作

java - 连接到在 Docker 中运行的 Kafka

stream - 卡夫卡流 - 如何分组两次?

java - 如何读取txt文件的特定行并显示到JTable

java - Hibernate:实体副本 [Event#XXX] 已分配给不同的实体 [Event#XXX]

java - 对象引用的哈希码和等于?

java - 交易中如何做柜台? (分片计数器)

apache-kafka - 卡夫卡流: how to get offset limits prior to application for reprocessing and how to stop it

apache-kafka - kafka 消费者组在 0.11 3.0.0 版本上失败

java - 异常处理导致代码逻辑不清晰