java - Spring Integration AMQP - 消息偶尔未确认,需要超时吗?

标签 java spring-integration spring-amqp spring-rabbit

我有一个入站 RabbitMQ channel 适配器,每天成功处理 3000 条消息,但是偶尔我会在 RabbitMQ 管理控制台中看到未确认的消息计数为 1。这似乎仍然如此。

我确实有一个重试建议链,可以重试 3 次,然后通过死信路由键转移到 DLQ,这对于大多数异常(exception)情况都效果很好。

过去几周内发生了两次未确认的情况,其中一次我能够进行线程转储并看到 int-http:outbound-gateway 调用被卡住等待 http 响应 getStatusCode()

我在 int-amqp:inbound-channel-adapter 上有一个 receive-timeout="59000",我希望它会在超过超时的任何地方使线程超时?

我现在注意到 int-http:outbound-gateway 上有一个回复超时属性,我应该设置它吗?

有什么想法值得赞赏吗?

    <int-amqp:inbound-channel-adapter id="amqpInCdbEvents"  channel="eventsAMQPChannel" channel-transacted="true"  transaction-manager="transactionManager" 
            queue-names="internal.events.queue" connection-factory="connectionFactory" 
            receive-timeout="59000" concurrent-consumers="${eventsAMQPChannel.concurrent-consumers}" 
            advice-chain="retryChain" auto-startup="false" /> 

     <int:channel id="eventsAMQPChannel" />

     <!--  CHAIN of processing for Asynch Processing of Events from intermediate Queue   -->
    <int:chain id="routeEventChain" input-channel="eventsAMQPChannel">
            <int:json-to-object-transformer type="xx.xx.xx.json.Event" object-mapper="springJacksonObjectMapper"/>
            <int:header-enricher>
                  <int:header name="originalPayload" expression="payload" overwrite="true"/>
                  <int:header name="message_id"      expression="payload.id" overwrite="true"/>
            </int:header-enricher>  
            <int:router expression="payload.eventType">
                <int:mapping value="VALUE"              channel="valueEventChannel"/>
                <int:mapping value="SWAP"               channel="swapEventChannel"/>
            </int:router>
    </int:chain>

    <int:channel id="valueEventChannel" />
    <int:channel id="swapEventChannel" />

    <int:chain id="valueEventChain" input-channel="valueEventChannel" output-channel="nullChannel">
            <int:transformer ref="syncValuationTransformer" />
            <int:object-to-json-transformer object-mapper="springJacksonObjectMapper" />
            <int:header-enricher>
                     <int:header name="contentType" value="application/json;charset=UTF-8" overwrite="true"/>
            </int:header-enricher>                  
            <int-http:outbound-gateway id="httpOutboundGatewayValuationServiceFinalValuation"                                                           
                    expected-response-type="java.lang.String"
                    http-method="POST"      charset="UTF-8"
                    extract-request-payload="true"                                          
                    url="${value.service.uri}/value"/>
    </int:chain>

最佳答案

reply-timeout 是将回复发送到回复 channel 时的超时(如果它可以阻塞 - 例如,有界队列 channel 已满)。

int-http:outbound-gateway call was stuck waiting for a http response getStatusCode()

您在可以配置到出站适配器中的 ClientHttpRequestFactory 上设置客户端超时 (readtimeout)...

/**
 * Create a new instance of the {@link RestTemplate} based on the given {@link ClientHttpRequestFactory}.
 * @param requestFactory HTTP request factory to use
 * @see org.springframework.http.client.SimpleClientHttpRequestFactory
 * @see org.springframework.http.client.HttpComponentsClientHttpRequestFactory
 */
public RestTemplate(ClientHttpRequestFactory requestFactory) {
    this();
    setRequestFactory(requestFactory);
}

关于java - Spring Integration AMQP - 消息偶尔未确认,需要超时吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43496883/

相关文章:

java - Netbeans 中 Ubuntu 12.04 下 Java 中缺少 "org"和 "com"软件包

java - tomcat 中的 WAR 部署

spring-integration - Spring Integration 同步/异步消息流

java - 在 Spring Boot 上动态设置 Spring AMQP 和 RabbitMQ 的主机

java - 由于发送的 download.oracle.com HTTP 请求抛出 404 错误,无法安装 Java9

java - 从服务器弃用了 getPlayer()

JAVA - 当新文件到达ftp服务器时如何触发

java - 如何使用wireTap传递 header ?

spring-boot - 将 Spring AMQP 消费者/生产者服务迁移到 Spring Stream 源时出现问题

java - RabbitMQ队列问题。消息监听器不消费前两条消息