spring - 使用 2 个消费者配置时消费者吞吐量缓慢

标签 spring spring-integration apache-zookeeper apache-kafka

使用 spring-integration-kafka 扩展和以下配置:

<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="10000"
    zk-session-timeout="10000" zk-sync-time="2000" />

<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
                group-id="realtime-services-consumer-grp" 
                value-decoder="purchaseDecoder" 
                key-decoder="kafkaReflectionDecoder"
                max-messages="5" >
            <int-kafka:topic id="purchase" streams="1" />
        </int-kafka:consumer-configuration>
        <int-kafka:consumer-configuration 
                group-id="realtime-services-consumer-gw"
                value-decoder="eventDecoder" 
                key-decoder="kafkaReflectionDecoder" 
                max-messages="10" >
            <int-kafka:topic id="event" streams="1" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
    auto-startup="true" channel="inputFromKafka">
    <int:poller fixed-delay="20" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>

例如,当我评论第一个 consumer-configuration 时,我每分钟可以处理 300 个事件,没有任何问题。但当两者都被激活时。我的吞吐量非常低。来自两个主题的总吞吐量低于每分钟 50 个。

有人知道为什么我在阅读 2 个主题时表现这么差吗?我在配置中做错了什么?

最佳答案

感谢您指出这一点!

在与我本地的 Kafka cluster 进行了一番激烈的斗争之后,我已经能够重现您的问题,并且我为您提供了一些解决方法:-)。

首先它不是round-robin ,但是一一:

for (final ConsumerConfiguration<K, V> consumerConfiguration : getConsumerConfigurations().values()) {
    Map<String, Map<Integer, List<Object>>> messages = consumerConfiguration.receive();

其中每一个consumerConfiguration在此期间被阻止在后台consumer-timeout="5000" ,如果 KafkaStream 中没有消息现在。因此整个poll任务来自<int-kafka:inbound-channel-adapter>被阻塞直到超时或更糟:如果每个主题没有消息,则整个等待超时是超时的总和!

要解决此问题,您可以减少 consumer-timeout="5000"或提供几个<int-kafka:consumer-context>因此<int-kafka:inbound-channel-adapter>对于每个主题。

是的,这看起来很奇怪,而且我们在发布前没有找到时间来看看这真的很糟糕,但无论如何请随意提出 JIRA问题来解决它。

谢谢!

关于spring - 使用 2 个消费者配置时消费者吞吐量缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28412482/

相关文章:

spring - Maven tomcat :run plugin and datasource configuration via JNDI

java - TcpInboundGateway 服务器配置 - 超时时发送自定义消息

spring-boot - Spring Boot 上 Spring 集成的 Kafka 配置

java - 如何对 Linux 上的 Apache Accumulo 安装进行故障排除?

Spring 请求映射通配符异常

spring - 识别 Spring MVC 架构模式

spring - 如何为 ServletContextPropertyPlaceholderConfigurer 设置默认上下文参数值?

java - Spring集成UDP服务器不监听

apache - 尝试创建zNode时Apache Curator未实现的错误

java - 客户端重启后如何重新获取InterProcessMutex