java - ActiveMQ/JMS "losing"消息 - 我缺少什么?

标签 java jms activemq spring-batch

我错过了什么?

AMQ 版本 5.13.2 Java 1.8.0_74 Windows 10

给定一个简单的测试用例,传输两个对象消息,一个包含数据,另一个是数据结束标记。仅接收数据结束标记。

队列在作业开始时创建,并在作业完成后销毁。

如果我运行大量交易,我会看到大约 50% 的接收率。

日志清楚地显示接收器在第一条消息放入队列之前启动,两条消息都放入队列,但实际上只接收第二条消息。

发送者和接收者都在同一个 JVM 上。每个都有自己的 session 和连接。

连接和队列设置代码:

@Override
public void beforeJob(JobExecution jobExecution) {
    // TODO Auto-generated method stub
    try {
        jobParameters = jobExecution.getJobParameters();

        readerConnection = connectionFactory.createConnection();
        readerConnection.start();

        writerConnection = connectionFactory.createConnection();
        writerConnection.start();

        jmsQueueManagementSession = writerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        queueName = jobParameters.getString("jobName") + "." + jobExecution.getId();

        queue = jmsQueueManagementSession.createQueue(getQueueName());

    } catch (JMSException ex) {
        throw new MaxisRuntimeException(
                MaxisCodeHelperImpl.generateCode("MXAR", MXMODULE, JMS_RECEIVER_INITIALIZATION_ERROR), null);
    }

}

发件人设置代码:

@Override
public void beforeStep(StepExecution stepExecution) {

    this.stepExecution = stepExecution;
    this.setJobExecution(stepExecution.getJobExecution());
    try {
        this.connection = jmsJobExecutionListener.getWriterConnection();
        this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        this.messageProducer = session.createProducer(jmsJobExecutionListener.getQueue());
    } catch (JMSException ex) {
        throw new RuntimeException(ex.getMessage(), ex);
    }
}

接收器设置代码:

@Override
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
    this.setJobExecution(stepExecution.getJobExecution());
    try {
        this.connection = jmsJobExecutionListener.getReaderConnection();
        this.jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        this.messageConsumer = jmsSession.createConsumer(jmsJobExecutionListener.getQueue());
    }
    catch (JMSException ex)
    {
        throw new RuntimeException(ex.getMessage(), ex);
    }

}

传输代码:

    private void doSomeStuffInTransaction (final Object model) {
        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
            @Override
            protected void doInTransactionWithoutResult(TransactionStatus status) {
                try {
                    doSomeStuff ( model );

                    ObjectMessage message = session.createObjectMessage(
                            (model.getRoot() == null)
                            ? null
                            : model.getRoot().getContents().getId());
                    messageProducer.send(message);
                    logger.debug("Sent: {}", message.toString());
                }catch (Exception e) {
                      //use this to rollback exception in case of exception
                    status.setRollbackOnly();
                    throw new RuntimeException(e.getmessage(), e);
                }   

            }});
    }   

接收者代码:

public Object read() throws Exception,
        UnexpectedInputException, ParseException,
        NonTransientResourceException {

    Object result = null;

    logger.debug("Attempting to receive message on connection: ", connection.toString());

    ObjectMessage msg = (ObjectMessage) messageConsumer.receive();
    logger.debug("Received: {}", msg.toString());
    result = msg.getObject();

    return result;
}

日志片段:

DEBUG com.mylib.SelectedDataJmsReader - Attempting to receive message on connection: 
... snip ...
*** First message ***
DEBUG org.apache.activemq.broker.region.Queue - localhost Message ID:zip-56502-1457640572818-4:2:2:1:1 sent to queue://Stuff via SQL.402
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:2214
DEBUG com.maxis.mxmove.core.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:zip-56502-1457640572818-4:2:2:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610215, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@78cb27fd, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
INFO  com.maxis.mxmove.core.SelectedDataJmsWriter - Committed 1 stuff to redo log and JMS queue

*** Second Message ***
INFO  com.maxis.mxmove.core.SourceSelectionReaderImpl - Returning empty stuff and end-of-stream placeholder.
DEBUG org.apache.activemq.broker.region.Queue - localhost Message ID:zip-56502-1457640572818-4:2:2:1:2 sent to queue://Stuff via SQL.402
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=2, pending=0 toPageIn: 1, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 0, memUsage:3155
DEBUG com.maxis.mxmove.core.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:zip-56502-1457640572818-4:2:2:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610375, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
INFO  com.maxis.mxmove.core.SelectedDataJmsWriter - Committed 1 stuff to redo log and JMS queue

*** We received the last message, not the first.  We show two enqueues, and one dequeue.. ***
DEBUG com.maxis.mxmove.core.SelectedDataJmsReader - Received: ActiveMQObjectMessage {commandId = 7, responseRequired = true, messageId = ID:zip-56502-1457640572818-4:2:2:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:zip-56502-1457640572818-4:2:2:1, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610375, arrival = 0, brokerInTime = 1457640610375, brokerOutTime = 1457640610376, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1024, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
INFO  com.maxis.mxmove.core.SelectedDataJmsReader - executed read, found end-of-stream marker, returning null
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 1, memUsage:1107

最佳答案

在接收器设置代码中,请注意 beforeStep() 方法使用 @BeforeStep 进行注释。我认为这意味着接收器被设置两次,并且可能进行了预取优化。这是经过验证的,因为日志显示两个订阅。由于我不是 JMS 的重度用户,所以我错误地认为一个是针对接收方的,而另一个是针对发送方的。

@Override
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
    this.setJobExecution(stepExecution.getJobExecution());

去掉@BeforeStep注解后,日志只显示一个订阅

DEBUG org.apache.activemq.broker.region.Queue - queue://Workorders via SQL.408, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 1370, dequeueCount: 1369, memUsage:1024

下次有人告诉我使用注入(inject)时代码有多“干净”,我可能会考虑 Martial Arts Exchange 的应用

关于java - ActiveMQ/JMS "losing"消息 - 我缺少什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35896002/

相关文章:

java - 如何将 Activity 更改为 Fragment

java - 用于异步调用的 Apache Camel

amazon-web-services - 让 SQS 死信队列与 Spring Boot 和 JMS 一起使用

tomcat - 在哪里保存 ActiveMQ 数据?

java - 当我们已经设置 -classpath 来定位该文件时,类名必须是完全限定的吗?

java - 如何让 Jetty 自动重新加载它的 xml 配置文件?

java - 从数组列表点创建多边形 - 坐标序列?

java - JMS 中的隔离级别

transactions - 将事务与 JMS (ActiveMQ) 一起使用

java - 我需要一个使用 ssl 的 java stomp 客户端库