我正在尝试配置以下流程:当消息到达 Rabbit 队列时尝试获取锁定,查询远程文件服务器以获取某些文件,并将找到的每个文件发送到另一个队列并释放锁定发送所有文件后。
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.filter(m -> lockService.acquire())
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.aggregate()
.handle(m -> {
log.info("Releasing lock");
lock.release();
})
.get();
问题是流程在第一个 .handle
方法之后停止(老实说,正如预期的那样),并且我不知道如何配置它来执行我想要的操作?我尝试使用 .wireTap
和 .publishSubscribeChannel
但这使得 2 个流程彼此不依赖,并且我的锁在文件实际发送之前被释放。
如果有人能帮助我解释如何使用 DSL 修复它,那就太好了,因为我正在动态创建这些流......
- 编辑-
我尝试在 channel 上设置拦截器:
final DirectChannel channel = new DirectChannel();
channel.setInterceptors(Collections.singletonList(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lock.release();
}
}));
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(channel)
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get();
但是这样可以获取、释放锁,然后才获取消息。我做错了什么?
- 编辑 2 -
通过 Gitter 聊天中的帮助解决了这个问题,以防其他人遇到困难:
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(MessageChannels.direct().interceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lockService.release();
}
}))
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get());
最佳答案
拆分后的 pub/sub,一个子流上的 AMQP 处理程序,另一个子流上的聚合器应该可以正常工作。
每个消息都将在同一线程上连续调用,最后的消息会导致聚合器释放,再次在同一线程上。
话虽如此,您将需要在入站网关上进行一些 errorChannel 处理,以便在发生错误时释放锁定。
编辑
不太复杂的解决方案是自定义 ChannelInterceptor
在变换之前的 channel 而不是滤波器上,将锁锁定在 preSend()
并在 afterSendCompleted()
中发布(这既要求成功,也要求失败)。
关于java - Spring DSL 处理流程后?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54327106/