java - 如何从 Spring Integration 中的方法返回值而不中断原始消息流?

标签 java spring spring-integration

encode fileProcessor 的方法bean负责编码视频文件。如果遇到问题,则不应删除该文件,否则如果一切正常,则可以删除。现在,保留 Message 的唯一方法不改变有效载荷的流程是使encode方法返回 void .我需要返回一些“标题”信息,这样 SI稍后可以删除该文件。我尝试使用 MessageBuilder创建一个 Message<File>并返回它,但是当它到达下一个 channel 时它已经被包装并且有一个 MessageMessage 里面,因此我的表达式无法触发删除。

我想我可以使用包装好的 Message并在对象图中向下挖掘一层,但这看起来很笨拙。

在不破坏原始消息有效负载并且不使用 SI channel 和发送污染我的 POJO 编码方法的情况下添加一些返回值的最佳方法是什么?

这是我的配置:

<!-- ########################## -->
<!-- ###      Encoding      ### -->
<!-- ########################## -->

<file:inbound-channel-adapter 
    directory="${paths.encode}"
    channel="encodeChannel"
    filename-regex="${encode.regex}"
    prevent-duplicates="false">
    <int:poller fixed-rate="5000"/>
</file:inbound-channel-adapter>

<int:service-activator
    input-channel="encodeChannel"
    output-channel="encodeResultChannel"
    ref="fileProcessor"
    method="encode">
</int:service-activator>    

<!-- This is where I'm having trouble.  -->
<!-- I don't expect this router to work. -->
<int:router
    input-channel="encodeResultChannel"
    expression="payload">       
    <int:mapping value="true" channel="encodeDeleteChannel"/>
    <int:mapping value="false" channel="stdout"/>
</int:router>

<int:service-activator
    input-channel="encodeDeleteChannel"
    expression="payload.delete()"
    output-channel="stdout">
</int:service-activator>

<stream:stdout-channel-adapter 
    id="stdout" 
    append-newline="true" />

编辑:

我正在使用:

<properties>
    <spring-framework.version>3.2.3.RELEASE</spring-framework.version>
</properties>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring-framework.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>

编辑2:

这是更新后的配置

<!-- ########################## -->
<!-- ###      Encoding      ### -->
<!-- ########################## -->

<file:inbound-channel-adapter 
    directory="${paths.encode}"
    channel="filePickupChannel"
    filename-regex="${encode.regex}"
    prevent-duplicates="false">
    <int:poller fixed-rate="5000"/>
</file:inbound-channel-adapter>

<int:header-enricher
    input-channel="filePickupChannel"
    output-channel="encodeChannel">
    <int:header name="origFile" expression="payload"/>
</int:header-enricher>

<int:service-activator
    input-channel="encodeChannel"
    output-channel="encodeResultChannel"
    ref="fileProcessor"
    method="encode">
</int:service-activator>    

<int:router
    input-channel="encodeResultChannel"
    ignore-send-failures="false"
    default-output-channel="stdout"
    expression="payload">       
    <int:mapping value="true" channel="encodeDeleteChannel"/>
    <int:mapping value="false" channel="stdout"/>
</int:router>

<int:service-activator
    input-channel="encodeDeleteChannel"
    expression="headers['origFile'].delete()"
    output-channel="stdout">
</int:service-activator>

最佳答案

您使用的是哪个版本的 Spring Integration 和 Spring Framework?

fileProcessor.encode()的签名是什么意思看起来像?

你不应该得到一个嵌套的 Message<?> , AbstractReplyProducingMessageHandler具有以下逻辑...

private Message<?> createReplyMessage(Object reply, MessageHeaders requestHeaders) {
    AbstractIntegrationMessageBuilder<?> builder = null;
    if (reply instanceof Message<?>) {
        if (!this.shouldCopyRequestHeaders()) {
            return (Message<?>) reply;
        }
        builder = this.getMessageBuilderFactory().fromMessage((Message<?>) reply);
    }

...

    if (this.shouldCopyRequestHeaders()) {
        builder.copyHeadersIfAbsent(requestHeaders);
    }
    return builder.build();
}

因此,如果您返回 Message<?>您的消息已返回(使用您未设置的任何入站 header 进行了增强)。

您是否将 Spring Integration 3.0.x 与 Spring Framework 4.0.x 一起使用?如果是这样,您需要小心返回 org.springframework.integration消息,不是 org.springframework.messaging消息。

如果您返回 org.springframework.messaging消息,Spring Integration 确实会将其包装在 Spring Integration 消息中。

核心消息传递类已移至 spring-messaging Spring Framework 4.0 中的模块,因此它们可用于 websockets、STOMP 等。

Spring Integration 4.0.x 现在也使用这些类,因此您不会在类路径中看到它们;避免困惑。将 Spring Integration 3.0.x 与 Spring Framework 4.0.x 结合使用时,您需要非常小心地使用正确的类。

但是,一般来说,我们不建议将框架类(例如 Message<?>)添加到您的代码中,而是使用 POJO,并且框架会处理消息传递的细节...

boolean encode(File file) {...}

如果您需要在编码后访问有效载荷,请考虑事先将其提升为 header 。

<int:header-enricher ...>
    <int:header name="origFile" expression="payload" />
</int:header-enricher>

然后使用expression="headers['origFile'].delete()编码后。

编辑:

或者,成功时返回文件(因此它成为新的负载),失败时返回 null或者抛出异常,下游流程不会执行。

关于java - 如何从 Spring Integration 中的方法返回值而不中断原始消息流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23877175/

相关文章:

java - 为 Android 使用自定义 SQLite 数据库

java - 尝试使用 Mockito 进行监视时出现异常

java - 如何在单元测试中可靠地对 JPA 实体的更新进行单元测试

java - Spring Batch 电子邮件监听器

java - 使用Autowire获取spring xml中配置的sftpChannel

java - 创建 Artifact 叠加

c# - 在 C# 或 Java 中是否有类似 Javascript Eval 的东西

java - 如何动态连接不同的SFTP服务器?

java - 哪里可以下载 java-x86-windows.dll

Spring 集成: Send response to client http inbound gateway