spring-integration - 使用 Java DSL 的流程记录请求、响应和总时间

标签 spring-integration spring-dsl

我正在尝试构建一个以 Http.inboundGateway 开头的流程 -> 执行几项操作,例如将请求数据存储到数据库、执行 header 丰富、发送到 AMQP 并返回流程状态(成功/失败) ).

我有一些事情正在苦苦挣扎,但我无法弄清楚。

1.) 记录请求和响应。

我已经设法记录了 Http.inboundGateway 收到的请求(见下文。不确定这是否是正确的方法,但它有效。请建议有更好的方法来做到这一点)。也就是说,我无法获取发送给客户端的响应消息,而且我也不知道如何计算流的事务时间并将其记录到日志文件中。如果有一种方法可以让我在每次交易后打印统计信息,如 “已收到:5,成功:4,失败:1,平均交易时间:250 毫秒 ..等”

@Bean
public IntegrationFlow httpInboundGateway()
{
    return IntegrationFlows.from(Http.inboundGateway("/httplistner")
                .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                .mappedRequestHeaders("*"))
            .transform(new ObjectToStringTransformer())
            .wireTap(flow -> flow.handle(message -> logger.info(">> Received Request from Caller.\nHeaders : "+message.getHeaders() + "\nPay Load : "+message.getPayload())))
            .channel(httpRequestChannel())
            .get();
}

2.) 如何向 Spring DSL Flow 添加日志语句?

我希望能够将日志语句(用于调试)添加到我的集成 DSL 定义中,这样我就可以查看日志文件并了解发生了什么以及出了什么问题。到目前为止,除了如上面的定义所示在流程中间添加“.wireTap”之外,我无法找到一种方法来做到这一点。如果有更好/正确的方法,请提出建议。

3.) 自定义“Http.inboundGateway”发送的响应。

我不知道如何自定义 Http.inboundGateway 在流程完成后发送回客户端的 HTTP 响应。我该怎么做,或者您能指点我可以阅读并理解如何做的文档吗?我希望使用 Spring DSL。

错误响应也是如此。如您所见,我没有向我的 Http.inboundGateway 添加错误 channel 。因此,如果在其当前配置中现在发生错误,客户端将获得 500 和完整的堆栈跟踪。我如何获取错误消息并能够根据错误构建自定义响应并将其发送给客户端。示例:如果他们向我发送了 XML 有效负载并且 XML 格式错误,我希望能够向他们发送 HTTP 400,并在响应中提供一些详细信息,表明他们的请求数据格式不正确。

最佳答案

捕获输出的技巧是 log() 后跟一个“无处可及的桥”——之所以这样命名是因为它没有输出 channel ——因此框架将结果发送回网关。

给你...

@SpringBootApplication
public class So41990546Application {

    public static void main(String[] args) {
        SpringApplication.run(So41990546Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(Http.inboundGateway("/foo")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class))
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .<String, String>transform(String::toUpperCase)
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Bridge.to.nowhere"))
                .get();
    }

    @Bean
    public IntegrationFlow errorsFlow() {
        return IntegrationFlows.from(Http.inboundGateway("/errors")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class)
                    .errorChannel("errors.input"))
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .transform("1 / 0")
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Another.bridge.to.nowhere"))
                .get();
    }

    @Bean
    public IntegrationFlow errors() {
        return f -> f.transform("'Error: ' + payload.cause.message")
                .enrichHeaders(b -> b.header(HttpHeaders.STATUS_CODE, 400))
                .log(Level.INFO) // log the whole message so we can see the status code
                .bridge(e -> e.id("Another.b.t.n"));
    }

}

.log 是在 1.2 中添加的,并且在下面使用了窃听器。

编辑

如果您使用命名 channel ...

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(Http.inboundGateway("/foo")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class))
                .channel(namedChannel())
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .<String, String>transform(String::toUpperCase)
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Bridge.to.nowhere"))
                .get();
    }

    public MessageChannel namedChannel() {
        return new DirectChannel();
    }

并启用指标,如 the documentation 中所述你可以获得all kinds of stats from that channel which will include average elapsed time for the downstream flow .

关于spring-integration - 使用 Java DSL 的流程记录请求、响应和总时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41990546/

相关文章:

Spring Integration Java DSL - 如何使用重试建议调用 ServiceActivator 方法

java - 是否有使用 Java DSL 使用 Spring Integration 记录消息的组件?

java - Spring Integration DSL JDBC 入站 channel 适配器

java - Spring Mail Integration - 我如何将 org.springframework.messaging.Message 转换为 javax.mail.internet.MimeMessage

java - 仅第一条消息传送到服务器

java - Spring 集成和 TCP 服务器套接字 - 如何向客户端发送消息?

java - RabbitMQ + Spring 集成。队列大小1,仅在覆盖时删除

java - spring-rabbitmq 自动重试连接到代理

java - Spring Integration 在拆分迭代器时抑制异常

routes - Apache Camel 路由将值设置为 setHeader 并将该值作为输入传递给脚本