我有一个 Kafka 代理正在运行,消息已成功消费,但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况。
我已阅读this线程但发现日志显示在调试级别。我想知道我是否可以在事件触发器上手动处理这个问题,可能是因为我想自己处理 Kafka 代理的故障。 Spring Kafka 是否提供了一些东西来处理这种情况?
如果需要更多详细信息,请告诉我。我非常感谢任何能够为我指明正确方向的建议。谢谢
编辑1:
正如@Artem 的回答,我已经在我的 KafkaConsumer 中尝试过这个
@EventListener
public void handleEvent(NonResponsiveConsumerEvent event) {
LOGGER.info("*****************************************");
LOGGER.info("Hello NonResponsiveConsumer {}", event);
LOGGER.info("*****************************************");
}
即使 Kafka 服务器正在运行(当我第一次启动应用程序时),此事件也会触发一次。请参阅以下日志:
....
....
2017-12-04 13:08:02,177 INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 0
2017-12-04 13:08:02,218 INFO o.a.k.c.c.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [52.214.67.60:9091]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = workerListener
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.0
2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka commitId : cb8625948210849f
2017-12-04 13:08:02,350 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
2017-12-04 13:08:02,363 INFO o.s.b.a.e.j.EndpointMBeanExporter - Located managed bean 'auditEventsEndpoint': registering with JMX server as MBean [org.springframework.boot:type=Endpoint,name=auditEventsEndpoint]
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - Hello NonResponsiveConsumer ListenerContainerIdleEvent [timeSinceLastPoll=1.51237491E9s, listenerId=workerListener-0, container=KafkaMessageListenerContainer [id=workerListener-0, clientIndex=-0, topicPartitions=null], topicPartitions=null]
2017-12-04 13:08:02,403 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
....
....
编辑2:
通过将 spring-kafka
升级到 1.3.2
解决了问题
最佳答案
从版本 1.3.1
开始,有:
/**
* An event that is emitted when a consumer is not responding to
* the poll; a possible indication that the broker is down.
*
* @author Gary Russell
* @since 1.3.1
*
*/
@SuppressWarnings("serial")
public class NonResponsiveConsumerEvent extends KafkaEvent {
并引用文档:
In addition, if the broker is unreachable (at the time of writing), the consumer
poll()
method does not exit, so no messages are received, and idle events can’t be generated. To solve this issue, the container will publish aNonResponsiveConsumerEvent
if a poll does not return within 3x thepollInterval
property. By default, this check is performed once every 30 seconds in each container. You can modify the behavior by setting themonitorInterval
andnoPollThreshold
properties in theContainerProperties
when configuring the listener container. Receiving such an event will allow you to stop the container(s), thus waking the consumer so it can terminate.
关于java - 处理 Kafka Broker 宕机时的故障,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47524315/