java - RxJava : How to group arbitrary large number of items using my own boundary function

标签 java rx-java

我有一个发出字符串的可观察对象,我想按第一个字符对它们进行分组。使用 groupBy 很容易做到这一点:

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");

Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() {
  public Character call(String row) {
    return row.charAt(0);
  }
}).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() {
  public Observable<List<String>> call(GroupedObservable<Character, String> group) {
    return group.toList();
  }
});

groupedRows.toBlocking().forEach(new Action1<List<String>>() {
  public void call(List<String> group) {
    System.out.println(group);
  }
});

// Output:
// [aa, ab, ac]
// [bb, bc]
// [cc]

但这对我的目的不利,因为 groupBy 仅在源可观察对象发出 onComplete 时完成每个组。因此,如果我有很多行,它们将完全聚集在内存中,并且仅在最后一行“刷新”并写入输出。

我需要像 buffer 运算符这样的东西,但有我自己的函数,它表示每个组的边界。我是这样实现的(知道行总是按字母顺序排列):

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");

ConnectableObservable<String> connectableRows = rows.publish();

Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() {
  private char lastChar = 0;

  public Boolean call(String row) {
    char currentChar = row.charAt(0);
    boolean isNewGroup = lastChar != 0 && (currentChar != lastChar);
    lastChar = currentChar;
    return isNewGroup;
  }
});

Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector);

connectableRows.connect();

groupedRows.toBlocking().forEach(new Action1<List<String>>() {
  public void call(List<String> group) {
    System.out.println(group);
  }
});

// Output:
// []
// []
// []

它不起作用,因为 boundarySelector 正在“吃掉”行,我认为这很奇怪,因为我专门使用 ConnectableObservable 来表示我需要两个订阅者( boundarySelectorgroupedRows) 在 rows 开始发射之前。

奇怪的是,如果我将 rows 延迟 1 秒,那么这段代码就可以工作。

所以问题是:如何使用我自己的边界函数对任意数量的行进行分组?

最佳答案

Observable<Integer> source = Observable.range(0, 100);

source
.groupBy(k -> k / 10)
.publish(groups -> groups
        .map(g -> Pair.of(g.getKey(), g.takeUntil(groups)))
        .flatMap(kv -> 
            kv.second
            .doOnNext(v -> System.out.println(kv.first + " value " + v))
            .doOnCompleted(() -> System.out.println(kv.first + " done"))
        ))
.subscribe()
;

关于java - RxJava : How to group arbitrary large number of items using my own boundary function,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30976263/

相关文章:

java - 如何在Hadoop的Mapper中导入和使用类?

java - XCode导入使用Java Security生成的公钥文件

java - 用 Java 构建基于用户输入的动态 SQL 查询

java - 如何修复 "MissingBackpressureException"

java - 在java中使用链表乘多项式

java - 使用 MenuBar 创建框架 - pack() 无法识别大小,菜单栏被截断

java - 使用 RxJava 主题作为回调包装器

android - 等待位置,然后用 rxjava 执行改造调用

java - RxJava - 在 Observable<List<Service>> 中等待所有服务的 Observable<Boolean>

java - Spring MVC 异步方法 (RXJava)