我希望创建一个自定义消息处理程序以在流程中使用检查点。此外,这些检查点将存储在 ElasticSearch 中。
我创建了一个类检查点:
@Component
public class Checkpoint {
public static final String TASK_HEADER_KEY = "task";
public static CheckpointMessageHandlerSpec warn(String message) {
return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message));
}
}
// ... methods omitted: error, info etc
接下来我创建了CheckpointMessageHandlerSpec:
public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> {
public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) {
this.target = checkpointHandler;
}
public CheckpointMessageHandlerSpec apply(Message<?> message) {
this.target.handleMessage(message);
return _this();
}
@Override
protected CheckpointHandler doGet() {
throw new UnsupportedOperationException();
}
}
CheckpointHandler,在这个类中我希望注入(inject)一些东西,比如来自 Spring Data 的服务或存储库:
public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler {
private String status;
private String message;
// I want inject services or repositories here
public CheckpointHandler(String status, String message) {
this.status = status;
this.message = message;
}
@Override
public void handleMessage(Message<?> message) {
// Test to watch if I have the bean factory. It is always null
this.getBeanFactory();
Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'");
// Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected
Object obj = expression.getValue(message);
}
}
最后是一个在流程中使用的示例:
@Bean
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) {
return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow"))
.enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName))
.handle(new AppendMessageHandler())
.wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)))
.handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload()))
.get();
}
private class AppendMessageHandler implements GenericHandler {
@Override
public String handle(Object payload, Map headers) {
return new StringBuilder().append(testMessage).toString();
}
}
我想念什么?可以这样做吗?我在这个问题之后有了这个想法How to create custom component and add it to flow in spring java dsl?
谢谢!
最佳答案
Bean 可以 Autowiring ,如果它们确实是 Bean。
让我们再看一下您的代码!
c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m))
这里真正的 Bean 正是 Lambda :)。当然,很遗憾,但不是您的自定义工厂以及后续的 apply() 。您的自定义代码将在目标 Lambda 中针对每条传入消息准确调用,但不了解 BeanFactory
。
要解决您的问题,您应该按原样使用您的工厂:
.wireTap(c -> c.handle(checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '")))
框架将负责将您的 CheckpointHandler
注册为 bean,从而实现 Autowiring 。
您可能已经猜到您不需要该 apply()
方法。只是因为 Java DSL 为 bean 填充树时需要区分组装阶段。初始化和注册阶段,框架解析该树,并将 bean 注册到应用程序上下文中。最后,还有一个运行时阶段,此时消息通过所有这些消息处理程序、转换器等从一个 channel 传输到另一个 channel 。
关于spring - 如何在 Spring Integration 自定义消息处理程序中 Autowiring bean?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39111470/