java - RxJava2 flatMap 创建重复事件

标签 java android rx-java rx-java2

我是 RxJava2 的新手,我遇到了一些奇怪的行为,所以很可能我以错误的方式使用了该工具。

这是一个相当大的项目,但我将下面的代码 fragment 分离为最小的可重现代码:

Observable
  .interval(333, TimeUnit.MILLISECONDS)
  .flatMap(new Function<Long, ObservableSource<Integer>>() {
    private Subject<Integer> s = PublishSubject.create();
    private int val = 0;

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
      val++;
      s.onNext(val);
      return s;
      }
    })
  .subscribe(new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
      Log.w("value: %s", integer);
     }
  });

此代码使用 .interval 模拟来 self 的 rx 流的事件和一个 flatMap接收这些事件“做一些处理”并使用 Subject将结果推向下游。

流是一个持续的过程,将有几个事件。

这个最少的代码很愚蠢,因为我只在 apply 上推送回调,但在实际情况中,有几个可能发生推送的时刻以及在 apply 期间收到的事件数量与通过主题发送的金额不同。

我希望通过这段代码看到的是:

value: 2  // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc

我实际得到的是:

value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
 ... etc

我也尝试过 Observable<Integer> o = s.share();并返回,或者直接返回s.share();结果相同。

我有点理解为什么会这样。 ObservableSource再次订阅 n 次 n 次,所以每个循环都有更多事件。

问题:

我怎样才能实现预期的行为?

(如果我的预期行为不清楚,请在评论中询问更多)

最佳答案

您的 PublishSubject 被订阅了多次,每个项目一次来自 interval()。

编辑:您需要每次都传入一个新的PublishSubject(如果您想保留第一个/最后一个发射,请切换到BehaviorSubject);将其传递给长时间运行的进程,并确保在长时间运行的进程结束时正确调用其 onComplete

关于java - RxJava2 flatMap 创建重复事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42280054/

相关文章:

java - IntelliJ 15 中令人讨厌的宽左间距

android - 如何为导航栏图标设置边距?

android - 捕获 MVVMCross 中的 EditText 更改

android - 为什么 RxJava 经常与 Retrofit 一起使用?

java - 如何自动删除子实体?

java - 调整图像大小以适合 JLabel

java - Jersey Web 应用程序未部署在 tomcat 上

android - 使用枢轴点缩放 Canvas 后,x 和 y 坐标错误

rx-java - 我们如何使用 RxJava 进行分页 Web 服务调用,其中每个页面都依赖于前一个页面的响应,而无需递归?

java - 使用 RxJava 进行改造时处理额外的 observable