java - Observable#repeat 在响应式扩展中的奇怪行为

标签 java rx-java reactive-programming repeat sampling

我正在研究 rx 运算符并且很好奇为什么 just(null).repeat() 不能作为任何内置运算符的参数:

Observable.interval(1, TimeUnit.SECONDS)
    .sample(Observable.just(null).repeat())
    .subscribe(System.out::println);

我原以为它会打印 0 1 2 3 ... 但它只是挂起。我想这是因为 repeat 占用了默认的 Scheduler,但是,如果您交换 intervaljust-repeat 的角色 然后它按预期工作,每秒打印一次 null:

Observable.just(null).repeat()
    .sample(Observable.interval(1, TimeUnit.SECONDS))
    .subscribe(System.out::println);

这是怎么回事?

最佳答案

如果您没有指定调度程序(并且没有运算符(operator)设置调度程序),那么所有处理都发生在同一个线程上。 just(null).repeat() 将占用 100% 的 CPU 核心,因此其他任何东西都没有机会继续。

在你的例子中,interval 是在 Scedulers.computation() 调度器上产生的,因为它在开始时并且之后没有调度器发生变化,你的 repeat 也在同一个线程上工作。

在第二种情况下,所有东西都在同一个线程上被订阅,除了间隔,它在它自己的调度器上;其余部分取决于 sample 的内部实现。

如果您使用特定的调度程序,它应该可以工作:

.sample(Observable.just(null).repeat().subscribeOn(Schedulers.computation()))

请注意,如果您只想使用空值而不是 interval 生成的数字,更有效的方法是使用 map 而不是 sample:

.map(any -> (Object) null)

关于java - Observable#repeat 在响应式扩展中的奇怪行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42259673/

相关文章:

java - 如何在 RxJava 服务器轮询中对 "repeatWhen"进行单元测试

system.reactive - 使用异步模式 (queue.BeginReceive,queue.EndReceive) 为 MSMQ 消息接收使用响应式(Reactive)扩展 (Rx)

java - Mono 与 CompletableFuture

javascript - 如果没有匹配的累加器,RxJS 如何减少?

java - 在 Java 中使用 BufferedImage

Java Swing - 单用户应用程序到多用户应用程序

kotlin - RxJava 返回单,执行完之后

java - jaxb 编码器 characterEscapeHandler

java - Spring MVC(Spring 2.5)关于Post-Redirect-Get的问题

android - 使用 RxJava 转换 HashMap 值