java - Spring DSL 处理流程后?

标签 java spring-integration spring-integration-dsl

我正在尝试配置以下流程:当消息到达 Rabbit 队列时尝试获取锁定,查询远程文件服务器以获取某些文件,并将找到的每个文件发送到另一个队列并释放锁定发送所有文件后。

IntegrationFlows.from(Amqp.inboundGateway(container)
    .messageConverter(messageConverter)
)
    .filter(m -> lockService.acquire())
    .transform(m -> remoteFileTemplate.list(inputDirectory))
    .split()
    .handle(Amqp.outboundAdapter(amqpTemplate)
        .exchangeName("")
        .routingKey(routingKey)
    .aggregate()
    .handle(m -> {
      log.info("Releasing lock");
      lock.release();
    })
    .get();

问题是流程在第一个 .handle 方法之后停止(老实说,正如预期的那样),并且我不知道如何配置它来执行我想要的操作?我尝试使用 .wireTap.publishSubscribeChannel 但这使得 2 个流程彼此不依赖,并且我的锁在文件实际发送之前被释放。

如果有人能帮助我解释如何使用 DSL 修复它,那就太好了,因为我正在动态创建这些流......

  • 编辑-

我尝试在 channel 上设置拦截器:

final DirectChannel channel = new DirectChannel();
channel.setInterceptors(Collections.singletonList(new ChannelInterceptor() {
  @Override
  public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
    lockService.acquire();
    return message;
  }

  @Override
  public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
    lock.release();
  }
}));
IntegrationFlows.from(Amqp.inboundGateway(container)
    .messageConverter(messageConverter)
)
    .channel(channel)
    .transform(m -> remoteFileTemplate.list(inputDirectory))
    .split()
    .handle(Amqp.outboundAdapter(amqpTemplate)
        .exchangeName("")
        .routingKey(routingKey)
    .get();

但是这样可以获取、释放锁,然后才获取消息。我做错了什么?

  • 编辑 2 -

通过 Gitter 聊天中的帮助解决了这个问题,以防其他人遇到困难:

IntegrationFlows.from(Amqp.inboundGateway(container)
    .messageConverter(messageConverter)
)
.channel(MessageChannels.direct().interceptor(new ChannelInterceptor() {
  @Override
  public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
    lockService.acquire();
    return message;
  }

  @Override
  public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
    lockService.release();
  }
}))
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
    .exchangeName("")
    .routingKey(routingKey)
    .get());

最佳答案

拆分后的 pub/sub,一个子流上的 AMQP 处理程序,另一个子流上的聚合器应该可以正常工作。

每个消息都将在同一线程上连续调用,最后的消息会导致聚合器释放,再次在同一线程上。

话虽如此,您将需要在入站网关上进行一些 errorChannel 处理,以便在发生错误时释放锁定。

编辑

不太复杂的解决方案是自定义 ChannelInterceptor在变换之前的 channel 而不是滤波器上,将锁锁定在 preSend()并在 afterSendCompleted() 中发布(这既要求成功,也要求失败)。

关于java - Spring DSL 处理流程后?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54327106/

相关文章:

java - 当使用synchronized和 volatile 关键字时哪些变量与主内存同步

java - 自定义Spring集成出站网关实现

spring - spring中如何并发读取并处理多个文件?

spring-integration - 接收器组件在 spring 云数据流中无法使用 kafka 获取正确的数据

java - 来自 Transformer 的 Spring Integration 路由消息

java - 如何只保留一些邮件标题并删除所有其他邮件标题?

redis - spring 集成 redis 轮询器与事务

java - 保持 Java 程序无限期运行的有效方法?

java - Android JSON WebRequest - JSONException : End of input at character 0

java - 在应用程序完全重启之前不要再次显示 mainactivity