我是 RxJava2 的新手,我遇到了一些奇怪的行为,所以很可能我以错误的方式使用了该工具。
这是一个相当大的项目,但我将下面的代码 fragment 分离为最小的可重现代码:
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = PublishSubject.create();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
此代码使用 .interval
模拟来 self 的 rx 流的事件和一个 flatMap
接收这些事件“做一些处理”并使用 Subject
将结果推向下游。
流是一个持续的过程,将有几个事件。
这个最少的代码很愚蠢,因为我只在 apply
上推送回调,但在实际情况中,有几个可能发生推送的时刻以及在 apply
期间收到的事件数量与通过主题发送的金额不同。
我希望通过这段代码看到的是:
value: 2 // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc
我实际得到的是:
value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
... etc
我也尝试过 Observable<Integer> o = s.share();
并返回,或者直接返回s.share();
结果相同。
我有点理解为什么会这样。 ObservableSource
再次订阅 n 次 n 次,所以每个循环都有更多事件。
问题:
我怎样才能实现预期的行为?
(如果我的预期行为不清楚,请在评论中询问更多)
最佳答案
您的 PublishSubject
被订阅了多次,每个项目一次来自 interval()。
编辑:您需要每次都传入一个新的PublishSubject
(如果您想保留第一个/最后一个发射,请切换到BehaviorSubject
);将其传递给长时间运行的进程,并确保在长时间运行的进程结束时正确调用其 onComplete
。
关于java - RxJava2 flatMap 创建重复事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42280054/