spring - 如何在 Spring Integration 自定义消息处理程序中 Autowiring bean?

标签 spring spring-integration

我希望创建一个自定义消息处理程序以在流程中使用检查点。此外,这些检查点将存储在 ElasticSearch 中。

我创建了一个类检查点:

@Component
public class Checkpoint {

    public static final String TASK_HEADER_KEY = "task";

    public static CheckpointMessageHandlerSpec warn(String message) {
        return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message));
    }
}
// ... methods omitted: error, info etc

接下来我创建了CheckpointMessageHandlerSpec:

public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> {

    public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) {
        this.target = checkpointHandler;
    }

    public CheckpointMessageHandlerSpec apply(Message<?> message) {
        this.target.handleMessage(message);
        return _this();
    }

    @Override
    protected CheckpointHandler doGet() {
        throw new UnsupportedOperationException();
    }
}

CheckpointHandler,在这个类中我希望注入(inject)一些东西,比如来自 Spring Data 的服务或存储库:

public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler {

    private String status;
    private String message;

    // I want inject services or repositories here

    public CheckpointHandler(String status, String message) {
        this.status = status;
        this.message = message;
    }

    @Override
    public void handleMessage(Message<?> message) {
        // Test to watch if I have the bean factory. It is always null
        this.getBeanFactory();

        Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'");

        // Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected   
        Object obj = expression.getValue(message);
    }
}

最后是一个在流程中使用的示例:

@Bean
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) {
    return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow"))
            .enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName))
            .handle(new AppendMessageHandler())
            .wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)))
            .handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload()))
            .get();
}

private class AppendMessageHandler implements GenericHandler {

    @Override
    public String handle(Object payload, Map headers) {
        return new StringBuilder().append(testMessage).toString();
    }
}

我想念什么?可以这样做吗?我在这个问题之后有了这个想法How to create custom component and add it to flow in spring java dsl?

谢谢!

最佳答案

Bean 可以 Autowiring ,如果它们确实是 Bean。

让我们再看一下您的代码!

c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m))

这里真正的 Bean 正是 Lambda :)。当然,很遗憾,但不是您的自定义工厂以及后续的 apply() 。您的自定义代码将在目标 Lambda 中针对每条传入消息准确调用,但不了解 BeanFactory

要解决您的问题,您应该按原样使用您的工厂:

.wireTap(c -> c.handle(checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '")))

框架将负责将您的 CheckpointHandler 注册为 bean,从而实现 Autowiring 。

您可能已经猜到您不需要该 apply() 方法。只是因为 Java DSL 为 bean 填充树时需要区分组装阶段。初始化和注册阶段,框架解析该树,并将 bean 注册到应用程序上下文中。最后,还有一个运行时阶段,此时消息通过所有这些消息处理程序、转换器等从一个 channel 传输到另一个 channel 。

关于spring - 如何在 Spring Integration 自定义消息处理程序中 Autowiring bean?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39111470/

相关文章:

java - 如何使用 Tomcat 8 + Spring Boot + Maven

spring - 如何使用mockito模拟bean进行camel路由测试spring bean

java - JUnit:我可以保留 spring 上下文的公共(public)部分吗?

Java - 用于保留的分布式 JPA 锁

java - Spring Integration FileHeader.OriginalFile 被忽略

java - Spring集成UDP服务器不监听

java - Spring/Hibernate 实体管理 Web 界面/UI

java - 如何在 spring-integration 中模拟 InboundAdapter?

java - Spring cvc复合型.2.4.a : Invalid content was found starting with element 'channel'

java - 如何忽略 Spring Integration 路由器中的 Channel ResolutionException?