java - Camel 消耗队列中的消息但无法重新创建它

标签 java oracle apache-camel jms oracle-aq

我遇到一个问题,我的对应方正在将消息放置在 Oracle 上的 JMS TEXT 队列上(某些自定义应用程序不使用 java),而我要拾取它。路线很简单:

<!--Receive data from oracle queue -->
<route id="ReadFromQueue" autoStartup="true">
    <from uri="oracleQueue:queue:SCAQUSER.SCD2TC?jmsMessageType=Text" />
    <log message="Picked up message ${body}" />
    <to uri="file:/e:/Tradechannel/in" />
</route>

连接已启动并正在运行,当消息放入队列时,我的路由会立即响应。当使用 oracle.jms.traceLevel=6 运行代码时,至少在我看来一切都工作正常。

Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  Timeout: 1 seconds.  Iterations: 1 Interval: 120 Last interval: 1
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  wait time: 1
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  payload type:SYS.AQ$_JMS_TEXT_MESSAGEpayload type code:2002
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.getCompliant:  Session :false
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData:  entry
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData:  textLen: 288
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData:  exit
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  text_message retrieved
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  set AQ enqueue time as 1576249545000
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  msg_id: 999832415E09038AE0535C35EB751E09 corrid: null priority: 1
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  msg_delay(secs): 0
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  exptime(secs): -1
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  attempts: 0
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  exception queue: null
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.getCompliant:  Session :false
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  recv_time: 1576746524523
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked:  entry
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked:  oracle.jms.useEmulatedXA is on
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.inGlobalTrans:  entry, reCheck=true
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.checkForGlobalTxn:  entry
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.inGlobalTrans:  exit
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked:  exit
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.restartConsumers:  entry
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.doCommit:  acknowledged one message received by committing the database connection
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  exit

The message is removed from the queue but in my application log I get an error...

2019-12-19 10:08:44:554 o.a.c.c.jms.EndpointMessageListener WARN  - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - JMSXGroupSeq]
org.apache.camel.RuntimeCamelException: JMSXGroupSeq
    at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:225)
    at org.apache.camel.component.jms.JmsMessage.populateInitialHeaders(JmsMessage.java:235)
    at org.apache.camel.impl.DefaultMessage.createHeaders(DefaultMessage.java:258)
    at org.apache.camel.component.jms.JmsMessage.ensureInitialHeaders(JmsMessage.java:220)
    at org.apache.camel.component.jms.JmsMessage.getHeader(JmsMessage.java:170)
    at org.apache.camel.impl.DefaultMessage.getHeader(DefaultMessage.java:94)
    at org.apache.camel.impl.DefaultUnitOfWork.(DefaultUnitOfWork.java:115)
    at org.apache.camel.impl.DefaultUnitOfWork.(DefaultUnitOfWork.java:75)
    at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:34)
    at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:695)
    at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:663)
    at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:634)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:149)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: javax.jms.MessageFormatException: JMS-117: Conversion failed - invalid property type
    at oracle.jms.AQjmsError.throwMsgFormatEx(AQjmsError.java:452)
    at oracle.jms.AQjmsMessage.getObjectProperty(AQjmsMessage.java:1584)
    at org.apache.camel.component.jms.JmsMessageHelper.getProperty(JmsMessageHelper.java:118)
    at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:214)
    ... 25 common frames omitted

If I understand the stacktrace correctly it is the oracle.jms api that is missing a header JMSXGroupSeq which is not unlikely since the counterpart isn't using the oracle jms api to create the message. Neither him nor me is able to view the actual queue (it's on our mutual client's side) so I have no means of checking what headers are actually in place.

Using a custom headerFilterStrategy is no help since the error is thrown right before I get to the headerFilterStrategy. See below extract from org.apache.camel.component.jms.JmsBinding method extractHeadersFromJms. The error is thrown from JmsMessageHelper.getProperty()

Object value = JmsMessageHelper.getProperty(jmsMessage, name);
if (headerFilterStrategy != null 
 && headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
 continue;
 }

所以我的第一个问题是......

是否可以接收可能“无效”的消息 - 我可以让camel和oracle api忽略丢失/无效的 header 吗?

第二个问题...为什么 Oracle 会确认该消息,就好像一切正​​常,而实际上该消息从未在接收方成功重新创建?

最佳答案

由于错误消息“转换失败”,我怀疑 JMS 属性 JMSXGroupSeq 具有无效类型

由于Camel只是尝试在JmsMessageHelper.getProperty中读取这个值,我猜这个属性中一定有一些非常奇怪的值。

您是否尝试过使用 JMS Toolbox 等工具来查看这样的消息? ?如果您看到属性(property)值(value),您也许可以建议生产商如何修复它。

回答您的问题:

是否可以收到可能“无效”的消息?

对于 Camel,我不这么认为。 Camel 擅长让您的生活变得方便,因此可以完成所有低级别的工作,并为您提供一个带有消息的漂亮 Exchange 对象。

如果在这部分内抛出异常,我想您运气不好,因为正如您已经提到的,它是在您干预之前抛出的。这隐含地表明了这种情况的发生是多么罕见,以及这个属性值是多么奇怪。

但是,您当然可以使用其他东西来使用消息。它不会尝试自动读取所有 header ,但至少会执行 JMS 的低级工作。也许Spring JMS将是一名候选人。 Camel JMS 在底层使用 Spring JMS。

Oracle 为什么要确认该消息?

对于属性JMSXGroupSeqJMS Docs

JMSXGroupID and JMSXGroupSeq are standard properties that clients should use if they want to group messages. All providers must support them. Unless specifically noted, the values and semantics of the JMSX properties are undefined.

如果值未定义,则代理无法使用任何内容进行验证。

关于java - Camel 消耗队列中的消息但无法重新创建它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59407157/

相关文章:

java - 尝试用Java创建一个旋转的转轮

java - 本书带您了解 Java API?

java - 如何使用rest dsl指定swagger参数 "format"?

java - Camel "Succesfull delivery acknowledgements"

oracle - 通过 PL/SQL 访问 HTTPS Web 服务的问题

java - Camel 路线聚合序列问题

java - Elasticsearch从1升级到5 - SearchRequestBuilder.setNoFields

java - 如何从 C++ 代码 (android) 调用非静态 Java 方法?

oracle - 如何在 Mac 上安装 Oracle Instant Client?

java - 更改序列 H2DB 的数据类型