spring boot activemq消费者连接池

标签 spring spring-boot activemq connection-pooling consumer

Spring Boot ActiveMQ消费者连接池需要配置吗?我在 spring boot 应用程序中只有一个消费者(作为微服务),生产者在另一个应用程序中。我对以下内容有点困惑:(摘自 http://activemq.apache.org/spring-support.html )

注意:虽然 PooledConnectionFactory 确实允许创建事件消费者的集合,但它不会“汇集”消费者。池化对于连接、 session 和生产者来说很有意义,它们可能是很少使用的资源,创建起来很昂贵,并且可以以最小的成本保持闲置。另一方面,消费者通常只是在启动时创建并继续运行,处理传入的消息。当消费者完成时,最好将其关闭而不是让它闲置并将其返回到池中供以后重用:这是因为,即使消费者空闲,ActiveMQ 也会继续将消息传递到消费者的预取缓冲区,在消费者再次活跃之前,他们会被阻止。

在同一页面上,我可以看到:
您可以使用 activemq-pool org.apache.activemq.pool.PooledConnectionFactory 为您的消费者集合有效地汇集连接和 session ,或者您可以使用 Spring JMS org.springframework.jms.connection.CachingConnectionFactory 实现同样的效果

我尝试了 CachingConnectionFactory(它可以采用 ActiveMQConnectionFactory),它只有几个 setter 来保存 cacheConsumers(boolean)、cacheProducers(boolean),与连接池无关。我知道 1 个连接可以给你多个 session ,然后每个 session 你有多个消费者/生产者。但我的问题是对于消费者来说,我们如何汇集,因为上面的声明是说让它默认。所以我只用了一种方法:
@Bean public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer 配置器) {

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // This provides all boot's default to this factory, including the message converter
    factory.setConcurrency("3-10");
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;
}</em><br>

Dynamic scaling这个链接也暗示了这一点,但我找不到具体的解决方案。

有人遇到过这种情况吗,请指教。感谢您阅读这篇文章,非常感谢任何帮助。 生产的其他详细信息:

  • 此消费者每秒将收到约 500 条消息。
  • 使用 Spring Boot 版本 1.5.8.RELEASE,
  • ActiveMQ 5.5 是我的 JMS

  • 最佳答案

    activemq中有一个名为org.apache.activemq.jms.pool的包,它提供了PooledConsumer。下面是代码。请检查并查看它是否适合您。我知道这不是 spring 方式,但您可以轻松地管理自定义轮询方法。

    PooledConnectionFactory pooledConFactory = null;
        PooledConnection pooledConnection = null;
        PooledSession pooledSession = null;
        PooledMessageConsumer pooledConsumer = null;
        Message message = null;
        try
        {
            // Get the connection object from PooledConnectionFactory
            pooledConFactory = ( PooledConnectionFactory ) this.jmsTemplateMap.getConnectionFactory();
            pooledConnection = ( PooledConnection ) pooledConFactory.createConnection();
            pooledConnection.start();
    
            // Create the PooledSession from pooledConnection object
            pooledSession = ( PooledSession ) pooledConnection.createSession( false, 1 );
    
            // Create the PooledMessageConsumer from session with given ack mode and destination
            pooledConsumer = ( PooledMessageConsumer ) pooledSession.
                    createConsumer( this.jmsTemplateMap.getDefaultDestination(), <messageFilter if any>);
    
            while ( true )
            {
                message = pooledConsumer.receiveNoWait();
                if ( message != null) 
                    break;
            }
    
        }
        catch ( JMSException ex )
        {
            LOGGER.error("JMS Exception occured, closing the session", ex );
        }
        return message;
    

    关于spring boot activemq消费者连接池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47718492/

    相关文章:

    java - 多种配置文件- Spring Bean 注入(inject)

    java - Spring Boot - 使用自定义对象数组 JSON 序列化自定义对象

    java - spring @autowired 如何用于接口(interface)和实现类

    cassandra - Apache Cassandra 作为 ActiveMQ 的消息数据存储

    java - STS映射 Controller 查看

    java - 配置文件上的 Spring 自动配置顺序/优先级

    java - 环境特定的 Eureka 属性未加载

    java - 在java spring中使用ActiveMq为多个项目创建公共(public)代理

    apache-camel - Camel ActiveMQ 客户端阻塞,临时存储使用率立即达到 100%

    java - Spring 4 - HTTP 状态 400,所需参数不存在