java - 处理 Kafka Broker 宕机时的故障

标签 java spring spring-boot apache-kafka spring-kafka

我有一个 Kafka 代理正在运行,消息已成功消费,但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况。

我已阅读this线程但发现日志显示在调试级别。我想知道我是否可以在事件触发器上手动处理这个问题,可能是因为我想自己处理 Kafka 代理的故障。 Spring Kafka 是否提供了一些东西来处理这种情况?

如果需要更多详细信息,请告诉我。我非常感谢任何能够为我指明正确方向的建议。谢谢

编辑1:

正如@Artem 的回答,我已经在我的 KafkaConsumer 中尝试过这个

@EventListener
public void handleEvent(NonResponsiveConsumerEvent event) {
    LOGGER.info("*****************************************");
    LOGGER.info("Hello NonResponsiveConsumer {}", event);
    LOGGER.info("*****************************************");     
}

即使 Kafka 服务器正在运行(当我第一次启动应用程序时),此事件也会触发一次。请参阅以下日志:

....
....
2017-12-04 13:08:02,177 INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 0
2017-12-04 13:08:02,218 INFO o.a.k.c.c.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [52.214.67.60:9091]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = workerListener
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.0
2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka commitId : cb8625948210849f
2017-12-04 13:08:02,350 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService 
2017-12-04 13:08:02,363 INFO o.s.b.a.e.j.EndpointMBeanExporter - Located managed bean 'auditEventsEndpoint': registering with JMX server as MBean [org.springframework.boot:type=Endpoint,name=auditEventsEndpoint]
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - Hello NonResponsiveConsumer ListenerContainerIdleEvent [timeSinceLastPoll=1.51237491E9s, listenerId=workerListener-0, container=KafkaMessageListenerContainer [id=workerListener-0, clientIndex=-0, topicPartitions=null], topicPartitions=null]
2017-12-04 13:08:02,403 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
....
....

编辑2:

通过将 spring-kafka 升级到 1.3.2 解决了问题

最佳答案

从版本 1.3.1 开始,有:

/**
 * An event that is emitted when a consumer is not responding to
 * the poll; a possible indication that the broker is down.
 *
 * @author Gary Russell
 * @since 1.3.1
 *
 */
@SuppressWarnings("serial")
public class NonResponsiveConsumerEvent extends KafkaEvent {

并引用文档:

In addition, if the broker is unreachable (at the time of writing), the consumer poll() method does not exit, so no messages are received, and idle events can’t be generated. To solve this issue, the container will publish a NonResponsiveConsumerEvent if a poll does not return within 3x the pollInterval property. By default, this check is performed once every 30 seconds in each container. You can modify the behavior by setting the monitorInterval and noPollThreshold properties in the ContainerProperties when configuring the listener container. Receiving such an event will allow you to stop the container(s), thus waking the consumer so it can terminate.

关于java - 处理 Kafka Broker 宕机时的故障,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47524315/

相关文章:

c# - 内部录音程序

java - 如何在 spring hibernate 中加入表

java - 千分尺永远不会更新

java - 创建一个 Spring Boot 应用程序,但无法让 @autowired 工作

java - setOut方法如何改变System类中final的System.out的值

java - 由于 Java 版本,Xamarin Forms Previewer 在 Visual Studio 中不适用于 Android

java - http ://anonsvn. jboss.org/repos/labs/labs/jbossrules/trunk/drools-api/src/main/resources/change-set-1.0.0.xsd 返回 301

java - Spring Integration JMS 消费者未消费所有消息

java - 如何在 Spring 框架中安排任务每 'N' 秒发生一次

spring-boot - Thymeleaf 读取替换内的属性