Spring Integration Java DSL 中是否有修改现有消息头的方法?
我正在使用 SI Java DSL 重新实现下载重试机制,并希望在根据与限制相比的尝试次数路由消息之前,在发生故障时递增保存下载尝试的消息 header 。
我的路由基于 SI 中包含的 RouterTests 运行良好。使用 HeaderEnrichers,我可以轻松添加 header ,但我看不到修改现有 header 的方法。
谢谢
/**
* Unit test of {@link RetryRouter}.
*
* Based on {@link RouterTests#testMethodInvokingRouter2()}.
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class RetryRouterTests {
/** Failed download attempts are sent to this channel to be routed by {@link ContextConfiguration#failedDownloadRouting( ) } */
@Autowired
@Qualifier("failed")
private MessageChannel failed;
/** Retry attempts for failed downloads are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
@Autowired
@Qualifier("retry-channel")
private PollableChannel retryChannel;
/** Failed download attempts which will not be retried, are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
@Autowired
@Qualifier("exhausted-channel")
private PollableChannel exhaustedChannel;
/**
* Unit test of {@link ContextConfiguration#failedDownloadRouting( ) } and {@link RetryRouter}.
*/
@Test
public void retryRouting() {
final int limit = 2;
for ( int attempt = 0 ; attempt <= limit + 1 ; attempt++ ){
this.failed.send( failed( attempt, limit) );
if ( attempt < limit){
assertEquals( payload( attempt ) , this.retryChannel.receive( ).getPayload( ) );
assertNull(this.exhaustedChannel.receive( 0 ) );
}else{
assertEquals( payload( attempt ) , this.exhaustedChannel.receive( ).getPayload( ) );
assertNotNull( this.exhaustedChannel.receive( ).getPayload( ) );
}
}
}
private Message<String> failed( int retry , int limit ) {
return MessageBuilder
.withPayload( payload( retry ) )
.setHeader("retries", new AtomicInteger( retry ) )
.setHeader("limit", limit)
.build();
}
private String payload (int retry){
return "retry attempt "+retry;
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public MessageChannel loggerChannel() {
return MessageChannels.direct().get();
}
@Bean(name = "retry-channel")
public MessageChannel retryChannel() {
return new QueueChannel();
}
@Bean(name = "exhausted-channel")
public MessageChannel exhaustedChannel() {
return new QueueChannel();
}
/**
* Decides if a failed download attempt can be retried or not, based upon the number of attempts already made
* and the limit to the number of attempts that may be made. Logic is in {@link RetryRouter}.
* <p>
* The number of download attempts already made is provided as a header {@link #attempts} from the connector doing the download,
* and the limit to the number of attempts is another header {@link #retryLimit} which is originally setup as
* a header by {@link DownloadDispatcher} from retry configuration.
* <p>
* Messages for failed download attempts are listened to on channel {@link #failed}.
* Retry attempts are routed to {@link #retryChannel()}
*
* @return
*/
@Bean
public IntegrationFlow failedDownloadRouting() {
return IntegrationFlows.from( "failed" )
.handle( "headers.retries.getAndIncrement()" )
.handle( logMessage ( "failed" ) )
.route(new RetryRouter())
.get();
}
/**
* Decides if a failed download attempt can be retried or not, based upon the number of attempts already made
* and the limit to the number of attempts that may be made.
* <p>
*/
private static class RetryRouter {
@Router
public String routeByHeader(@Header("retries") AtomicInteger attempts , @Header("limit") Integer limit) {
if (attempts.intValue() < limit.intValue()){
return "retry-channel";
}
return "exhausted-channel";
}
/** This method is not used but is required by Spring Integration otherwise application context doesn't load because of
* {@code Caused by: java.lang.IllegalArgumentException: Target object of type
* [class org.springframework.integration.dsl.test.routers.RetryRouterTests$RetryRouter] has no eligible methods for handling Messages.}
*
* @throws UnsupportedOperationException if called
*/
@SuppressWarnings("unused")
public String routeMessage(Message<?> message) {
throw new UnsupportedOperationException( "should not be used." );
}
}
}
最佳答案
有一种方法可以在不修改 header 的情况下完成您需要的操作:
.enrichHeaders(h -> h.header("downloadRetries", new AtomicInteger()))
然后当你需要增加它时你应该这样做:
.handle(m -> m.getHeaders().get("downloadRetries", AtomicInteger.class).getAndIncrement())
并且此处理作为重试服务的发布-订阅者 channel 上的第一个单向第一个订阅者。
更新
is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
感谢您分享有关此事的配置:现在我有一个问题,您误会了。解决方案必须是这样的:
return IntegrationFlows.from( "failed" )
.publishSubscribeChannel(s -> s
.subscribe(f -> f
.handle(m -> m.getHeaders().get("downloadRetries",
AtomicInteger.class).getAndIncrement()))
.handle( logMessage ( "failed" ) )
.route(new RetryRouter())
.get();
}
我们有一个 PublishSubscribeChannel
,子流中的 .subscribe()
是第一个订阅者的第一个订阅者,而 .handle( logMessage ( "failed") )
在主流中是第二个订阅者。在第一个订阅者的工作完成之前,最后一个不会被调用。
参见 Spring Integration Reference Manual和 Java DSL Manual了解更多信息。
关于spring-integration - 如何增加消息头,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34693248/