java - 响应式(Reactive)应用程序中的事务回滚

标签 java spring transactions rx-java reactive-programming

我正在使用 RxJava 1.1 从 Spring 应用程序内部组成一个可观察序列,如下所示:

@Transaction
public Observable<Event> create(Event event) {
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}

一个事件从某个 Controller 类到达我的应用程序的服务层,并通过使用 RxJava 的 flatMap() 函数构建的一系列操作传播。该事件首先存储在数据库 (Spring Data) 中,接下来的两个异步 HTTP 请求在后台使用 Spring 的 AsyncRestTemplate 库一个接一个地执行。

如果在管道中的任何地方抛出错误/异常,我希望能够回滚数据库事务,以便事件不存储在数据库中。我发现这并不容易做到,因为在 Spring 中,事务上下文与特定的执行线程相关联。因此,如果代码到达不同线程上的 doOnError 回调(AsyncRestTemplate 使用其自己的 AsyncTaskExecutor),则无法回滚最初创建的事务。

您能否建议任何机制来实现跨多线程应用程序的事务,该应用程序由多个以这种方式编写的异步操作组成?

我还尝试以编程方式创建交易:

TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());

然后通过管道发送 transactionStatus 对象和事件,但是当发生错误并且我调用“platformTransactionManager.rollback(status);”时,我得到“事务同步未激活”,因为它正在运行我猜是一个不同的线程。

附注 sendEventToServiceA/sendEventToServiceB 方法看起来与此类似:

public Observable<Event> sendEventToServiceA(event) {
    ..........
    ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
              "/serviceA/create?event_id=" + event.id,
              HttpMethod.POST, requestEntity, String.class);

    return ObservableUtil.toRxObservable(listenableFuture);
}

最佳答案

这样做的一种方法是确保在与数据库保存相同的线程上观察到错误:

@Transaction
public Observable<Event> create(Event event) {

     Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            .subscribeOn(scheduler)
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .observeOn(scheduler)
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}

关于java - 响应式(Reactive)应用程序中的事务回滚,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35872267/

相关文章:

java - 尝试从 Angular 向 Spring Boot API 插入数据时出现 "JSON parse error: Cannot construct instance of"错误

Spring 数据 Elasticsearch 动态更改 indexName

java - 为什么我的 spring boot @RequestMapping 不工作?

java - 有状态 EJB 生命周期问题

java - 当我们实例化一个对象时,是否会创建父类(super class)的实例?

java - 使用Java实现网络时间同步

Java single char 由多个字节表示的字符串

java - 如何将 Map 传递给 Controller ​​ throw Executions.createComponents(...) ? (ZK)

php - MYSQL X 带有事务的锁

spring - Spring事务传播和隔离