嗨,Camel/jms 开发人员。 使用 Apache Camel amqp jms 连接器。并作为 ActiveMQ Broker。
我的配置是默认的。
这是一个消费者代码示例:
public static void main(String[] args) throws Exception {
AMQPComponent amqpComponent = AMQPComponent.amqpComponent(HOST, USER, PWD);
CamelContext context = new DefaultCamelContext();
context.addComponent("amqp", amqpComponent);
context.start();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("amqp:queue:1test.queue?transacted=true")
.to("stream:out")
.end();
}
});
Thread.sleep(20*1000);
context.stop();
}
很容易看到,我已经配置了交易消费者。对于 1test.queue。 当我运行它时,在日志中看到:
[main] INFO org.apache.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: amqp://queue:1test.queue?transacted=true
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:1 connected to remote Broker: amqp:HOST2
[AmqpProvider :(2):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(2):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:2 connected to remote Broker: amqp:HOST2
[AmqpProvider :(3):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(3):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:3 connected to remote Broker: amqp:HOST2
如果我从消费者中删除 ?transacted=true
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:1 connected to remote Broker: amqp:HOST2
它只出现一次。
如何解释这种行为?这通常适用于 Camel 中的交易消费者?
谢谢。
附注已检查this topic但不确定如何将其映射到 Camel 现实。
最佳答案
AMQP Camel 组件尚未定义池连接工厂,因此它为每次迭代创建一个新连接以检查代理中是否有消息。
为了避免这种情况,您应该为 AMQPComponent 定义一个 CachingConnectionFactory,如下所示:
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.connection.CachingConnectionFactory;
@Bean
public JmsConnectionFactory jmsConnectionFactory() {
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(brokerUser, brokerPassword, brokerUrl);
return jmsConnectionFactory;
}
@Bean
@Primary
public CachingConnectionFactory jmsCachingConnectionFactory(JmsConnectionFactory jmsConnectionFactory) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory);
return cachingConnectionFactory;
}
@Bean
public JmsConfiguration jmsConfig(CachingConnectionFactory cachingConnectionFactory) {
JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(cachingConnectionFactory);
jmsConfiguration.setCacheLevelName("CACHE_CONSUMER");
return jmsConfiguration;
}
@Bean
public AMQPComponent amqpComponent(JmsConfiguration jmsConfiguration) {
AMQPComponent amqpComponent = new AMQPComponent();
amqpComponent.setConfiguration(jmsConfiguration);
return amqpComponent;
}
您将获得与 JMS Camel 组件相同的行为。
更多信息请访问 AMQP Camel Component页面
关于java - Apache Camel ?transacted=true,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51742970/