java - Spring 响应式(Reactive)事务在取消时提交,产生部分提交

标签 java spring reactive-programming spring-data-mongodb spring-transactions

我的项目使用spring-data-mongodb,一切都是响应式(Reactive)的。有一个 bean 具有使用声明性事务的事务方法。相关代码片段如下:

@Configuration
public class Config {

    @Bean
    public ReactiveMongoTransactionManager reactiveMongoTransactionManager() {
        return new ReactiveMongoTransactionManager(reactiveMongoDbFactory());
    }

    ...
}

@Service
public class MyService {
    private final ReactiveMongoOperations mongoOperations;

    ...

    @Transactional
    public Mono<User> saveUser(User user) {
        return mongoOperations.insert(user).then(anotherInsertOnMongoOperations()).thenReturn(user);
    }
}

这里没有什么异常。

我可以在日志中看到事务在插入文档之前开始,然后提交:

DEBUG o.s.d.m.ReactiveMongoTransactionManager -  About to start transaction for session [ClientSessionImpl@62de8058 id = {"id": {"$binary": "fye2h5JkRh6yL3MTqtC0Xw==", "$type": "04"}}, causallyConsistent = true, txActive = false, txNumber = 1, error = d != java.lang.Boolean].
DEBUG o.s.d.m.ReactiveMongoTransactionManager -  Started transaction for session [ClientSessionImpl@62de8058 id = {"id": {"$binary": "fye2h5JkRh6yL3MTqtC0Xw==", "$type": "04"}}, causallyConsistent = true, txActive = true, txNumber = 2, error = d != java.lang.Boolean].

...插入后面,然后...

DEBUG o.s.d.m.ReactiveMongoTransactionManager -  Initiating transaction commit
DEBUG o.s.d.m.ReactiveMongoTransactionManager -  About to commit transaction for session [ClientSessionImpl@62de8058 id = {"id": {"$binary": "fye2h5JkRh6yL3MTqtC0Xw==", "$type": "04"}}, causallyConsistent = true, txActive = true, txNumber = 2, error = d != java.lang.Boolean].

但有时,正如我从数据库的内容中看到的,只有第一个插入被持久化,而第二个插入则丢失。在尝试对情况进行建模后,我发现当整个 react 管道被取消时,就会发生这种“损失”(不是每次,但我能够生成一个以高概率重现这种情况的测试)。

我在我的方法的最终运算符之后添加了 .doOnSuccessOrError().doOnCancel() 以及一些日志记录。在“正常”情况下(没有取消),doOnSuccessOrError 成功记录。但是当发生取消时,有时日志中的事件顺序如下:

  1. 交易已启动
  2. 发生插入
  3. 发生取消
  4. 最终的 doOnSuccessOrError() 没有记录任何内容,并且在 onCancel() 中记录了一些内容(因此取消似乎是发生在业务方法执行的“中间”)
  5. ...但是事务仍然会提交!

TransactionAspectSupport.ReactiveTransactionSupport 包含以下代码(用于本例):

                            return Mono.<Object, ReactiveTransactionInfo>usingWhen(
                                    Mono.just(it),
                                    txInfo -> {
                                        try {
                                            return (Mono<?>) invocation.proceedWithInvocation();
                                        }
                                        catch (Throwable ex) {
                                            return Mono.error(ex);
                                        }
                                    },
                                    this::commitTransactionAfterReturning,
                                    (txInfo, err) -> Mono.empty(),
                                    this::commitTransactionAfterReturning)

最后一个参数是 onCancel 处理程序。

这意味着在取消时,事务实际上会被提交。

问题是:为什么?当由于响应式(Reactive)管道外部的原因而发生取消时,事务内的某些操作可能已完成,而有些操作尚未完成(并且永远不会完成)。在这样的时刻提交会产生部分提交,这违反了原子性要求。

启动回滚似乎更合乎逻辑。但我认为 spring-tx 的作者是故意这样做的。请问,这是什么原因?

附注为了验证我的观点,我修补了 spring-tx 5.2.3(顺便说一句,这是项目使用的版本),代码如下所示:

                            return Mono.<Object, ReactiveTransactionInfo>usingWhen(
                                    Mono.just(it),
                                    txInfo -> {
                                        try {
                                            return (Mono<?>) invocation.proceedWithInvocation();
                                        }
                                        catch (Throwable ex) {
                                            return Mono.error(ex);
                                        }
                                    },
                                    this::commitTransactionAfterReturning,
                                    (txInfo, err) -> Mono.empty(),
                                    this::rollbackTransactionDueToCancel)

    private Mono<Void> rollbackTransactionDueToCancel(@Nullable ReactiveTransactionInfo txInfo) {
        if (txInfo != null && txInfo.getReactiveTransaction() != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Rolling transaction back for [" + txInfo.getJoinpointIdentification() + "] due to cancel");
            }
            return txInfo.getTransactionManager().rollback(txInfo.getReactiveTransaction());
        }
        return Mono.empty();
    }

(基本上,只是将取消行为更改为回滚),并且通过此补丁,我的测试不再产生任何不一致的数据。

最佳答案

事实证明,响应式 Spring 事务确实有可能由于意外取消而半途提交: https://github.com/spring-projects/spring-framework/issues/25091

该问题是由“取消提交”政策引起的。 Spring 人员计划在 Spring 5.3 中将其切换为“取消时回滚”策略。目前,选项有:

  1. 如果您的某些事务包含多个写入,请使用自定义构建的 spring-tx 库并进行如下修复 https://github.com/rpuch/spring-framework/commit/95c2872c0c3a8bebec06b413001148b28bc78f2a切换到“取消时回滚”策略以避免此类令人不快的意外情况。但这意味着完全有效的 Reactor 操作符(使用取消信号作为其正常功能的一部分)将在事务操作符的下游变得不可用(因为它们定期发出的取消操作将回滚事务)。
  2. 如果您的所有事务每次最多写入一次,那么您可以安全地使用未打补丁的 Spring 版本。但请注意,Spring 人员(目前)将在 5.3 中翻转该策略。

这是一篇关于此事的文章:https://blog.rpuch.com/2020/05/25/spring-reactive-transactions-atomicity-violation.html (免责声明:我是本文的作者)。

关于java - Spring 响应式(Reactive)事务在取消时提交,产生部分提交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61822249/

相关文章:

java - 在整个应用程序中使用的常量在哪里保存?

java:返回一个集合

java - RxJava : combine two optional observables

java - 如何实现用户以安全的方式发布一些 html 格式的数据的可能性?

Java ReplaceAll 在替换中使用找到的模式

java - 如何使用 spring mvc + jackson 将嵌套的 json 对象发送到服务器

java - Spring 集成 LastModifiedFileListFilter 不起作用

java - jackson 循环依赖

stream - Dart Streams API(使用 rxdart)与其他响应式库(如 RxJava 和 RxJS)之间的主要区别是什么?

asynchronous - 如何使用千分尺计时器记录异步方法的持续时间(返回Mono或Flux)