java - Spring Integration - Java 配置 - 事务感知流程

标签 java spring spring-integration spring-transactions

我想聚合来自 3 个不同端点 (@ServiceActivator) 的响应并将聚合响应保留到数据库。

我遇到以下异常

org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: c.b.bean.jpa.PersonEntity.listsOfEmails, could not initialize proxy - no Session

如何让消息流事务感知?或者我错过了什么?

以下是代码片段,

配置

@Configuration
@EnableIntegration
@ComponentScan(basePackages={"integration.endpoint", "integration.sync"})
@IntegrationComponentScan(basePackages={"integration.gateway"})
public class InfrastructureConfiguration {

    @Bean
    @Description("Entry to the messaging system through the gateway.")
    public MessageChannel requestChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends transformed message to outbound channel.")
    public MessageChannel invocationChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends handler message to aggregator channel.")
    public MessageChannel aggregatorChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends handler message to response channel.")
    public MessageChannel responseChannel(){
        return pubSubChannel();
    }

    private PublishSubscribeChannel pubSubChannel() {
        PublishSubscribeChannel pubSub = new PublishSubscribeChannel(executor());
        pubSub.setApplySequence(true);
        return pubSub;
    }

    private Executor executor() {
        return Executors.newFixedThreadPool(10);
    }
}

启动网关

@MessagingGateway(name="entryGateway", defaultRequestChannel="requestChannel")
public interface IntegrationService {
    String initiateSync(AnObject obj);
}

消息生成器:它通过获取实体并将其设置为消息的属性来转换消息,并将消息发送到 channel 。随后,@Autowired 使用的该实体在 @ServiceActivator( 3 Endpoints) 中提供服务。该实体对其关联进行延迟初始化。

@Component
public class MessageBuilder {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageBuilder.class);

    @Autowired
    private ODao dao;

    @Transformer(inputChannel="requestChannel", outputChannel="invocationChannel")
    public OMessage buildMessage(Message<AnObject> msg){
        LOGGER.info("Transforming messages for ID [{}]", msg.getPayload().getId());
        OMessage om = new OMessage(msg.getPayload());
        om.buildMessage(dao);
        return om;
    }
}

端点-1

@Component
public class Handler1 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler1.class);

    @Autowired
    private service1 Service1;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler1 is called");
            rm = service1.getResponse(om);
        }else{
            LOGGER.info("Handler1 is not called");
        }
        return rm;
    }
}

端点-2

@Component
public class Handler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler2.class);

    @Autowired
    private service2 Service2;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler2 is called");
            rm = service2.getResponse(om);
        }else{
            LOGGER.info("Handler2 is not called");
        }
        return rm;
    }
}

端点-3

@Component
public class Handler3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler3.class);

    @Autowired
    private service3 Service3;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler3 is called");
            rm = service3.getResponse(om);
        }else{
            LOGGER.info("Handler3 is not called");
        }
        return rm;
    }
}

聚合器

@Component
public class MessageAggregator {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageAggregator.class);

    @Aggregator(inputChannel="aggregatorChannel", outputChannel="responseChannel")
    public Response aggregate(List<ResponseMessage> resMsg){
        LOGGER.info("Aggregating Responses");
        Response res = new Response();
        res.getResponse().addAll(resMsg);
        return res;
    }

    @ReleaseStrategy
    public boolean releaseChecker(List<Message<ResponseMessage>> resMsg) {
        return resMsg.size() ==3;
    }

    @CorrelationStrategy
    public ResponseMessage corelateBy(ResponseMessage resMsg) {
        LOGGER.info("CorrelationStrategy: message payload details {}", resMsg);
        return resMsg;
    }
}

最佳答案

您可以在 dao 层内获取对延迟加载域的引用。所以后面要用到的时候,不用代理就可以正常实例化。 例如,它可能像这样的片段:

public List<PersonEntity> fetchPersonsWithMails() {
    return sessionFactory.getCurrentSession()
        .createCriteria(PersonEntity.class)
        .setFetchMode("listsOfEmails", FetchMode.JOIN)
        .list();
}     

关于java - Spring Integration - Java 配置 - 事务感知流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30011714/

相关文章:

Mule ESB 与 Spring 集成

java - 一种重置 Spring Integration 上下文中所有消息的方法

java - 如何清理 Eclipse 中的组合 vars 声明?

java - 2D 数组的渲染距离--Minecraft 2D

java - spring.jpa.hibernate.ddl-auto = validate 属性是否验证表中的列?

java - Spring 4 安全配置运行但未激活

java - 使用两个接口(interface)声明变量

java - jsf 2.0 + springwebflow 2.0 中不受支持的异常

spring - 更改 Spring 输入验证语言

java - 套接字和 Spring 集成