java - 如果有很大的延迟,如何在可观察到的项目情绪之间运行函数?

标签 java rx-java delay observable reactive-programming

我有一个 PublishSubject,它接收来自 UI 的发射:

myPublishSubject
        .map {
            ...
        }
        .doOnNext {
            // using emitted item
        }
        .timeout (...) // wait for the gap!
        .doOnNext {
            // running a function after a specific gap between two item
        }
        .subscribe()

我想在上次发出后等待特定的时间(不是 onComplete,因为它稍后会继续发出)并运行一个函数。可以理解为元素情感之间的差距。

我正在寻找类似timeout的东西,但这个方法的问题是它会错误地终止Observable。

最佳答案

您必须对 publishswitchMap 有点创意,例如:

PublishSubject<Integer> ps = PublishSubject.create();

ps.publish(o -> 
    o.mergeWith(
        o.switchMap(e -> 
             Observable.just(1).delay(200, TimeUnit.MILLISECONDS)
            .ignoreElements()
            .doOnCompleted(() -> System.out.println("Timeout action: " + e))
        )
    )
).subscribe(System.out::println);

ps.onNext(1);
ps.onNext(2);

Thread.sleep(100);

ps.onNext(3);

Thread.sleep(250);

ps.onNext(4);

Thread.sleep(250);

它的工作原理是共享源并路由到两种方式,一种是直接发射,另一种提供 switchMap,当接收到新项目时,启动延迟的 Observable并对其完成使用react(忽略原始触发元素以避免由于 mergeWith 导致的重复事件)。当宽限期内有新信号时,switchMap 将取消之前的延迟并以较新的延迟开始。

关于java - 如果有很大的延迟,如何在可观察到的项目情绪之间运行函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44392611/

相关文章:

java - 如何通过超时简化 rxjava 流

c# - 如何正确暂停/延迟 Windows 窗体应用程序

rx-java - 在 RxJava 中,我无法从 flatMap 发出 onComplete

java - 使用带参数的方法引用

java - ScheduledExecutorService#scheduleAtFixedRate 不起作用

java - 避免在创建 url 时对数据进行编码

java - 消费观察者同时发出的值

java - 如何在android中的java代码中添加时间延迟

javascript - 如何延迟 "onsubmit"事件直到加载某些内容?

java - 如何使用 Hibernate Lucene Search 进行不区分大小写的排序?