java - 如何对并发 rxjava 执行进行单元测试

标签 java junit kotlin rx-java2 mockk

我有两个 Completable 在两个不同的线程上同时运行并访问共享资源。我想测试它们的行为,以便只有第一个运行特定的代码,第二个以错误结束。

由于我在测试中使用 Scheduler.trampoline() ,因此两个 Completable 无法同时运行,而是按顺序运行,因此我无法对该代码进行单元测试.

示例,

    val subscription = userRepository.logout().test()
    val subscriptionSimultaneous = userRepository.logout().test()


    subscription
        .assertNoErrors()
        .assertComplete()

    subscriptionSimultaneous
        .assertError(someError)
        .assertNotComplete()

    verify(exactly = 1) { somethingThatMustRunOnlyOnce() }

最佳答案

我实际上花了很多努力来实现这样的测试。

public class RxTest {
    @Test
    public void testConcurrency() {
        Logout logout = new Logout();

        AtomicInteger logoutCount = new AtomicInteger(0);
        AtomicInteger errorCount = new AtomicInteger(0);

        Completable logoutCompletable = Completable.fromAction(() -> logout.logout())
                .subscribeOn(Schedulers.io())
                .doOnComplete(() -> logoutCount.addAndGet(1))
                .doOnError(error -> errorCount.addAndGet(1))
                .onErrorComplete();
        int tries = 50;
        Completable[] arrayOfLogoutCompletables = new Completable[tries];
        for (int i = 0; i < tries; i++) {
            arrayOfLogoutCompletables[i] = logoutCompletable;
        }
        // run all in parallel and wait for all to finish
        Completable.mergeArray(arrayOfLogoutCompletables).blockingAwait();

        assertEquals(1,logoutCount.get());
        assertEquals(tries - 1, errorCount.get());
    }


    private static class Logout {
        private boolean loggedOut = false;

        /**
         * if you remove synchronized test will fail!!
         */
        private synchronized void logout() {
            if (loggedOut) throw new IllegalStateException();
            loggedOut = true;
        }

    }
}

测试在 Schedulers.io() 上同时运行最多 50 个 Completable,每个都调用 logout()。有一些计数器可以计算 logout() 成功和失败的次数。 blockingAwait 正在等待所有 Completables 完成。 运行此测试 100 次,如果删除 synchronized,它可能会在 20% 的时间内失败。 onErrorComplete() 是为了避免在所有 Completables 完成之前传播异常。

有趣的事实:如果您将 getter 和 setter 添加到 loggedOut 并在 logout() 中使用它,那么在没有 synchronized 的情况下,大多数情况下都会失败>.

希望对你有帮助!

关于java - 如何对并发 rxjava 执行进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53758428/

相关文章:

Java错误找不到符号

mysql - Gradle 加载 mysql-connector jar 但没有 dbunit jar 作为外部依赖项,为什么?

java - 同时使用 junit 断言和 mockito 验证

java - EasyMock/PowerMock导入问题

android - Firestore 文档引用序列化

java - 尺寸比较(周长/平方周长)等

java - 将通用类作为 Akka Streams Flow 的输入传递

java - 运行带有依赖项的 jar 时 Undertow 应用程序无法启动

javascript - Log4j2 路由附加器 JavaScript 配置错误

kotlin - kotlin 中的数据类继承?