我正在使用 RxJava 1.1 从 Spring 应用程序内部组成一个可观察序列,如下所示:
@Transaction
public Observable<Event> create(Event event) {
return Observable.just(event)
.flatMap(event -> {
//save event to db (blocking JPA operation)
Event event = eventRepository.save(event);
return Observable.just(event);
})
//async REST call to service A
.flatMap(this::sendEventToServiceA) <---- may execute on different thread
//async REST call to service B
.flatMap(this::sendEventToServiceB) <---- may execute on different thread
.doOnError( throwable -> {
// ? rollback initally created transaction?
})
}
一个事件从某个 Controller 类到达我的应用程序的服务层,并通过使用 RxJava 的 flatMap() 函数构建的一系列操作传播。该事件首先存储在数据库 (Spring Data) 中,接下来的两个异步 HTTP 请求在后台使用 Spring 的 AsyncRestTemplate 库一个接一个地执行。
如果在管道中的任何地方抛出错误/异常,我希望能够回滚数据库事务,以便事件不存储在数据库中。我发现这并不容易做到,因为在 Spring 中,事务上下文与特定的执行线程相关联。因此,如果代码到达不同线程上的 doOnError 回调(AsyncRestTemplate 使用其自己的 AsyncTaskExecutor),则无法回滚最初创建的事务。
您能否建议任何机制来实现跨多线程应用程序的事务,该应用程序由多个以这种方式编写的异步操作组成?
我还尝试以编程方式创建交易:
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
然后通过管道发送 transactionStatus 对象和事件,但是当发生错误并且我调用“platformTransactionManager.rollback(status);”时,我得到“事务同步未激活”,因为它正在运行我猜是一个不同的线程。
附注 sendEventToServiceA/sendEventToServiceB 方法看起来与此类似:
public Observable<Event> sendEventToServiceA(event) {
..........
ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
"/serviceA/create?event_id=" + event.id,
HttpMethod.POST, requestEntity, String.class);
return ObservableUtil.toRxObservable(listenableFuture);
}
最佳答案
这样做的一种方法是确保在与数据库保存相同的线程上观察到错误:
@Transaction
public Observable<Event> create(Event event) {
Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
return Observable.just(event)
.flatMap(event -> {
//save event to db (blocking JPA operation)
Event event = eventRepository.save(event);
return Observable.just(event);
})
.subscribeOn(scheduler)
//async REST call to service A
.flatMap(this::sendEventToServiceA) <---- may execute on different thread
//async REST call to service B
.flatMap(this::sendEventToServiceB) <---- may execute on different thread
.observeOn(scheduler)
.doOnError( throwable -> {
// ? rollback initally created transaction?
})
}
关于java - 响应式(Reactive)应用程序中的事务回滚,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35872267/