spring - Spring @KafkaListener在一定间隔后执行并轮询记录

标签 spring spring-kafka

我们想在一定间隔(例如每5分钟)后使用记录。
消费者属性是标准的:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(300000);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}

即使当我更改属性setPollTimeout时,它在定义的时间间隔(5分钟)后也不会轮询,但在30秒后会持续轮询,这是我的日志:
2018-01-23 18:07:26.875 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 2

2018-01-23 18:07:56.901 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 4

我们试图构建一个带有窗口聚合的kafka流应用程序,并计划在y间隔后使用窗口x。

我可以看到在类KafkaMessageListenerContainer中,设置了setConsumerTaskExecutor:
if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }

但是我们如何配置这个(频率)线程池何时轮询记录。任何帮助表示赞赏。

最佳答案

您无法控制使用者的轮询速率,pollTimeout是poll()等待新记录到达的时间。如果新记录到达的频率更高,它将不会等待那么长时间。

如果希望控制接收记录的速率,只需使用DefaultKafkaConsumerFactory创建使用者并在需要时对其进行轮询。

但是,您不能将其与@KafkaListener一起使用-您必须自己处理记录。

关于spring - Spring @KafkaListener在一定间隔后执行并轮询记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48402355/

相关文章:

java - LdapRepository 更新 spring-ldap

java - Spring中使用@RequestBody注解的GET请求

spring-integration - spring-cloud-stream-kafka 应用程序启动后仅消费最新消息

apache-kafka-streams - Tombstone 消息没有从 KTable 状态存储中删除记录?

java - Spring kafka @KafkaListener 没有被调用

spring-boot - springboot-kafka java 8时间序列化

spring - Spring Boot 未使用 Jackson Kotlin 插件

java - Singleton Bean 中可能存在的竞争条件

java - Spring Boot 应用程序 Tomcat 服务器未运行

spring - Spring 卡夫卡中有多个生产者的代码示例吗?