我想聚合来自 3 个不同端点 (@ServiceActivator) 的响应并将聚合响应保留到数据库。
我遇到以下异常
org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: c.b.bean.jpa.PersonEntity.listsOfEmails, could not initialize proxy - no Session
如何让消息流事务感知?或者我错过了什么?
以下是代码片段,
配置
@Configuration
@EnableIntegration
@ComponentScan(basePackages={"integration.endpoint", "integration.sync"})
@IntegrationComponentScan(basePackages={"integration.gateway"})
public class InfrastructureConfiguration {
@Bean
@Description("Entry to the messaging system through the gateway.")
public MessageChannel requestChannel(){
return pubSubChannel();
}
@Bean
@Description("Sends transformed message to outbound channel.")
public MessageChannel invocationChannel(){
return pubSubChannel();
}
@Bean
@Description("Sends handler message to aggregator channel.")
public MessageChannel aggregatorChannel(){
return pubSubChannel();
}
@Bean
@Description("Sends handler message to response channel.")
public MessageChannel responseChannel(){
return pubSubChannel();
}
private PublishSubscribeChannel pubSubChannel() {
PublishSubscribeChannel pubSub = new PublishSubscribeChannel(executor());
pubSub.setApplySequence(true);
return pubSub;
}
private Executor executor() {
return Executors.newFixedThreadPool(10);
}
}
启动网关
@MessagingGateway(name="entryGateway", defaultRequestChannel="requestChannel")
public interface IntegrationService {
String initiateSync(AnObject obj);
}
消息生成器:它通过获取实体并将其设置为消息的属性来转换消息,并将消息发送到 channel 。随后,@Autowired 使用的该实体在 @ServiceActivator( 3 Endpoints) 中提供服务。该实体对其关联进行延迟初始化。
@Component
public class MessageBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBuilder.class);
@Autowired
private ODao dao;
@Transformer(inputChannel="requestChannel", outputChannel="invocationChannel")
public OMessage buildMessage(Message<AnObject> msg){
LOGGER.info("Transforming messages for ID [{}]", msg.getPayload().getId());
OMessage om = new OMessage(msg.getPayload());
om.buildMessage(dao);
return om;
}
}
端点-1
@Component
public class Handler1 {
private static final Logger LOGGER = LoggerFactory.getLogger(Handler1.class);
@Autowired
private service1 Service1;
@Override
@ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
public ResponseMessage handle(Message<OMessage> msg) {
OMessage om = msg.getPayload();
ResponseMessage rm = null;
if(map.get("toProceed")){
LOGGER.info("Handler1 is called");
rm = service1.getResponse(om);
}else{
LOGGER.info("Handler1 is not called");
}
return rm;
}
}
端点-2
@Component
public class Handler2 {
private static final Logger LOGGER = LoggerFactory.getLogger(Handler2.class);
@Autowired
private service2 Service2;
@Override
@ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
public ResponseMessage handle(Message<OMessage> msg) {
OMessage om = msg.getPayload();
ResponseMessage rm = null;
if(map.get("toProceed")){
LOGGER.info("Handler2 is called");
rm = service2.getResponse(om);
}else{
LOGGER.info("Handler2 is not called");
}
return rm;
}
}
端点-3
@Component
public class Handler3 {
private static final Logger LOGGER = LoggerFactory.getLogger(Handler3.class);
@Autowired
private service3 Service3;
@Override
@ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
public ResponseMessage handle(Message<OMessage> msg) {
OMessage om = msg.getPayload();
ResponseMessage rm = null;
if(map.get("toProceed")){
LOGGER.info("Handler3 is called");
rm = service3.getResponse(om);
}else{
LOGGER.info("Handler3 is not called");
}
return rm;
}
}
聚合器
@Component
public class MessageAggregator {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageAggregator.class);
@Aggregator(inputChannel="aggregatorChannel", outputChannel="responseChannel")
public Response aggregate(List<ResponseMessage> resMsg){
LOGGER.info("Aggregating Responses");
Response res = new Response();
res.getResponse().addAll(resMsg);
return res;
}
@ReleaseStrategy
public boolean releaseChecker(List<Message<ResponseMessage>> resMsg) {
return resMsg.size() ==3;
}
@CorrelationStrategy
public ResponseMessage corelateBy(ResponseMessage resMsg) {
LOGGER.info("CorrelationStrategy: message payload details {}", resMsg);
return resMsg;
}
}
最佳答案
您可以在 dao 层内获取对延迟加载域的引用。所以后面要用到的时候,不用代理就可以正常实例化。 例如,它可能像这样的片段:
public List<PersonEntity> fetchPersonsWithMails() {
return sessionFactory.getCurrentSession()
.createCriteria(PersonEntity.class)
.setFetchMode("listsOfEmails", FetchMode.JOIN)
.list();
}
关于java - Spring Integration - Java 配置 - 事务感知流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30011714/