java - 回顾一下RxJava中的链条

标签 java reactive-programming rx-java

Random rand = new Random();
Observable<Integer> random1 = Observable.just(rand.nextInt());
Observable<Integer> random2 = random1.flatMap(r1->Observable.just(r1 * rand.nextInt()));
random2.subscribe(System.out::println);

上面的代码只是获取几个随机数,将它们相乘,然后打印输出。

这是我的问题:在打印结果时,是否有一种优雅的方式可以获取第一个随机值?请注意,我的实际系统正在进行异步调用,而不仅仅是生成随机数。

以下是一些我认为不优雅的示例:

Random rand = new Random();
Observable<Integer> random1 = Observable.just(rand.nextInt());
random1.subscribe(r1->{
  Observable<Integer> random2 = Observable.just(r1 * rand.nextInt());
  random2.subscribe(r2->{
    System.out.println(r1);
    System.out.println(r2);
  });
});

.

Random rand = new Random();
Observable<Integer> random1 = Observable.just(rand.nextInt());
Observable<int[]> result = random1.flatMap(r1->{
  int[] pair = new int[2];
  pair[0] = r1;
  pair[1] = r1 * rand.nextInt();
  return Observable.just(pair);
});
result.subscribe(pair-> {
  System.out.println(pair[0]);
  System.out.println(pair[1]);
});

.

Random rand = new Random();
int[] hack = new int[1];
Observable<Integer> random1 = Observable.just(rand.nextInt()).doOnNext(r1->hack[0]=r1);
Observable<Integer> random2 = random1.flatMap(r1->Observable.just(r1 * rand.nextInt()));
random2.subscribe(r2->{
  System.out.println(hack[0]);
  System.out.println(r2);
});

最后,我不确定这是否是一个好的做法:

Random rand = new Random();
Observable<Integer> random1 = Observable.just(rand.nextInt());
Observable<Integer> random2 = random1.flatMap(r1->Observable.just(r1 * rand.nextInt()));
random2.subscribe(r2-> System.out.println(random1.toBlocking().first()));

最佳答案

有一个 flatMap 重载,可让您指定一个 Func2,该 Func2 接收源值以及来自 Observable 的每个值,并为其进行扁平化:

Random rand = new Random();
Observable<Integer> random1 = Observable.just(rand.nextInt());
Observable<List<Integer>> random2 = random1.flatMap(
    r1 -> Observable.just(r1 * rand.nextInt()),
    (r1, r2) -> Arrays.asList(r1, r2));

random2.subscribe(System.out::println);

关于java - 回顾一下RxJava中的链条,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28510740/

相关文章:

android - RxJava 分别和一起订阅多个 observable

java - Spring Boot 应用程序在启动时终止

java - 使用 <<""//?? 创建字符串字符 Android java

javascript - 缓冲条形码扫描仪的击键

android - 如何在没有 subscribe() 的情况下处理 BlockingObservable 中的错误?

reactive-programming - 使用 RxJava 实现类似十字转门的运算符

java - 如何在ajax成功函数中从java获取单值响应

java - 按顺序显示链接列表,并在当前位置添加星号

python - RxPY 中带有 from_iterable/range 的 subscribe_on

javascript - 如何使用 rxjs 和 redux-observable 等待或监听 Url 已更改?