spring-integration - 如何增加消息头

标签 spring-integration dsl

Spring Integration Java DSL 中是否有修改现有消息头的方法?

我正在使用 SI Java DSL 重新实现下载重试机制,并希望在根据与限制相比的尝试次数路由消息之前,在发生故障时递增保存下载尝试的消息 header 。

我的路由基于 SI 中包含的 RouterTests 运行良好。使用 HeaderEnrichers,我可以轻松添加 header ,但我看不到修改现有 header 的方法。

谢谢

/**
 * Unit test of {@link RetryRouter}.
 * 
 * Based on {@link RouterTests#testMethodInvokingRouter2()}.
 */
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class RetryRouterTests {

    /** Failed download attempts are sent to this channel to be routed by {@link ContextConfiguration#failedDownloadRouting( ) } */
    @Autowired
    @Qualifier("failed")
    private MessageChannel failed;

    /** Retry attempts for failed downloads are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
    @Autowired
    @Qualifier("retry-channel")
    private PollableChannel retryChannel;

    /** Failed download attempts which will not be retried, are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
    @Autowired
    @Qualifier("exhausted-channel")
    private PollableChannel exhaustedChannel;

    /**
     * Unit test of {@link ContextConfiguration#failedDownloadRouting( ) } and {@link RetryRouter}.
     */
    @Test
    public void retryRouting() {

        final int limit = 2;

        for ( int attempt = 0 ; attempt <= limit + 1 ; attempt++ ){

            this.failed.send( failed( attempt, limit) );

            if ( attempt < limit){

                assertEquals( payload( attempt ) , this.retryChannel.receive( ).getPayload( ) );
                assertNull(this.exhaustedChannel.receive( 0 ) );

            }else{

                assertEquals( payload( attempt ) , this.exhaustedChannel.receive( ).getPayload( ) );
                assertNotNull( this.exhaustedChannel.receive( ).getPayload( ) );
            }
        }

    }

    private Message<String> failed( int retry , int limit ) {

        return MessageBuilder
            .withPayload(  payload( retry ) )
            .setHeader("retries", new AtomicInteger( retry ) )
            .setHeader("limit", limit)
            .build();
    }

    private String payload (int retry){
        return "retry attempt "+retry;
    }


    @Configuration
    @EnableIntegration
    public static class ContextConfiguration {

        @Bean
        public MessageChannel loggerChannel() {
            return MessageChannels.direct().get();
        }

        @Bean(name = "retry-channel")
        public MessageChannel retryChannel() {
            return new QueueChannel();
        }

        @Bean(name = "exhausted-channel")
        public MessageChannel exhaustedChannel() {
            return new QueueChannel();
        }


        /**
         * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
         * and the limit to the number of attempts that may be made. Logic is in {@link RetryRouter}.
         * <p>
         * The number of download attempts already made is provided as a header {@link #attempts} from the connector doing the download, 
         * and the limit to the number of attempts is another header {@link #retryLimit} which is originally setup as
         * a header by {@link DownloadDispatcher} from retry configuration.
         * <p>
         * Messages for failed download attempts are listened to on channel {@link #failed}. 
         * Retry attempts are routed to {@link #retryChannel()}
         *  
         * @return
         */
        @Bean
        public IntegrationFlow failedDownloadRouting() {

            return IntegrationFlows.from( "failed" )

                .handle( "headers.retries.getAndIncrement()" )
                .handle( logMessage ( "failed" ) )
                .route(new RetryRouter())
                .get();
        }

        /**
         * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
         * and the limit to the number of attempts that may be made. 
         * <p>
         */
        private static class RetryRouter {

            @Router
            public String routeByHeader(@Header("retries") AtomicInteger attempts , @Header("limit") Integer limit) {

                if (attempts.intValue() < limit.intValue()){
                    return "retry-channel";
                }
                return "exhausted-channel";
            }

            /** This method is not used but is required by Spring Integration otherwise application context doesn't load because of
             * {@code Caused by: java.lang.IllegalArgumentException: Target object of type 
             * [class org.springframework.integration.dsl.test.routers.RetryRouterTests$RetryRouter] has no eligible methods for handling Messages.}
             * 
             * @throws UnsupportedOperationException if called
             */
            @SuppressWarnings("unused")
            public String routeMessage(Message<?> message) {

                throw new UnsupportedOperationException( "should not be used." );
            }
        }
    }

最佳答案

有一种方法可以在不修改 header 的情况下完成您需要的操作:

.enrichHeaders(h -> h.header("downloadRetries", new AtomicInteger()))

然后当你需要增加它时你应该这样做:

.handle(m -> m.getHeaders().get("downloadRetries", AtomicInteger.class).getAndIncrement())

并且此处理作为重试服务的发布-订阅者 channel 上的第一个单向第一个订阅者。

更新

is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.

感谢您分享有关此事的配置:现在我有一个问题,您误会了。解决方案必须是这样的:

        return IntegrationFlows.from( "failed" )

            .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(m -> m.getHeaders().get("downloadRetries",
                                  AtomicInteger.class).getAndIncrement()))
            .handle( logMessage ( "failed" ) )
            .route(new RetryRouter())
            .get();
    }

我们有一个 PublishSubscribeChannel,子流中的 .subscribe() 是第一个订阅者的第一个订阅者,而 .handle( logMessage ( "failed") ) 在主流中是第二个订阅者。在第一个订阅者的工作完成之前,最后一个不会被调用。

参见 Spring Integration Reference ManualJava DSL Manual了解更多信息。

关于spring-integration - 如何增加消息头,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34693248/

相关文章:

dsl - 创建流程图的语言

Elasticsearch 的等效 SQL Where 子句

error-handling - Spring集成中的错误处理

spring-integration - MessageGroupStoreReaper 到底是做什么用的

java - Spring Integration - 具有消息存储的队列 - 如何检查数据库中的持久消息

elasticsearch - Elasticsearch查询:匹配句子数组中的单词

dsl - 如何使用 jvmmodelInferrer 在 xtext 中生成的 java 文件中添加自定义导入语句?

java - spring-boot-starter-integration 1.4.3 性能下降

java - Spring Integration 和 JPA 更新出站网关时出现循环引用错误

testing - 自动测试网页(并通过 DSL 从用例中生成)