我们想在一定间隔(例如每5分钟)后使用记录。
消费者属性是标准的:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(300000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
return factory;
}
即使当我更改属性
setPollTimeout
时,它在定义的时间间隔(5分钟)后也不会轮询,但在30秒后会持续轮询,这是我的日志:2018-01-23 18:07:26.875 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 2
2018-01-23 18:07:56.901 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 4
我们试图构建一个带有窗口聚合的kafka流应用程序,并计划在y间隔后使用窗口x。
我可以看到在类
KafkaMessageListenerContainer
中,设置了setConsumerTaskExecutor
:if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
但是我们如何配置这个(频率)线程池何时轮询记录。任何帮助表示赞赏。
最佳答案
您无法控制使用者的轮询速率,pollTimeout是poll()
等待新记录到达的时间。如果新记录到达的频率更高,它将不会等待那么长时间。
如果希望控制接收记录的速率,只需使用DefaultKafkaConsumerFactory
创建使用者并在需要时对其进行轮询。
但是,您不能将其与@KafkaListener
一起使用-您必须自己处理记录。
关于spring - Spring @KafkaListener在一定间隔后执行并轮询记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48402355/