java - 取出元素直到某个字符并用 RxJava 将它们分组

标签 java kotlin rx-java rx-java2 rx-kotlin

我有一个问题的简单设置,但解决方案似乎更复杂。

设置:我有一个热可观察对象,它源自一个扫描器,它将每个数字作为不同的元素发出,并在代码完成时发出 R

问题:据此,我想要一个热可观察对象,它将每个完整代码作为 1 个元素发出。

RxMarbles example

我尝试使用不同的 flatMaptakeUntilgroupBy 运算符,但未能找到解决方案。

最佳答案

您可以使用缓冲区 运算符。

PublishSubject<Token<Integer>> s = PublishSubject.create();

Observable<Token<Integer>> markers = s.filter(x->x.isMarker());

s.buffer(markers).subscribe(
    v->{
        Optional<Integer> reduce = v.stream()
            .filter(t->!t.isMarker())
            .map(t->(ValueToken<Integer>)t)
            .map(ValueToken::get)
            .reduce((a,b)->a+b);
        reduce.ifPresent(System.out::println);
    }
);

s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker()); // will emit 25

s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // will emit 17

s.onNext(value(10));
s.onNext(value(7)); // Not emitting yet

我创建了一个类来将值和标记都包装在流中。

public abstract class Token<T> {
    private static final MarkerToken MARKER = new MarkerToken<>();

    public boolean isMarker() {
        return false;
    }

    public static <T> MarkerToken<T> marker() {
        return MARKER;
    }

    public static <T> ValueToken<T> value(T o) {
        return new ValueToken<>(o);
    }

    public static class ValueToken<T> extends Token<T> {
        T value;

        public ValueToken(T value) {
            this.value = value;
        }

        public T get() {
            return value;
        }
    }

    public static class MarkerToken<T> extends Token<T> {
        public boolean isMarker() {
            return true;
        }
    }

}

更新(使用扫描)

之前的方法也会在流关闭时发出,使用此解决方案,您可以发出完整的缓冲区。

消息类作为一个累加器,它会累加 token ,直到累加结束标记。

发生这种情况时,下一条消息将从头开始。

结束标记作为最后一个元素的存在标记消息已完成。

public static class Message<T> {
    List<Token<T>> tokens = new ArrayList<>();

    public Message<T> append(Token<T> t) {

        Message<T> mx = new Message<T>();
        if(!isComplete()) {
            mx.tokens.addAll(tokens);
        }
        mx.tokens.add(t);
        return mx;
    }

    public boolean isComplete() {
        int n = tokens.size();
        return n>0 && tokens.get(n-1).isMarker();
    }

    public Optional<List<Token<T>>> fullMessage(){
        return isComplete() ? Optional.of(tokens):Optional.empty(); 
    }
}

扫描您为发出的每个标记发出消息的源,然后过滤掉不完整的消息并只发出标记为完整的消息。

    s.scan(new Message<Integer>(), (a, b) -> a.append(b))
        .filter(Message::isComplete)
        .map(Message::fullMessage)
        .map(Optional::get).subscribe(v -> {
            System.out.println(v);
        });

    s.onNext(value(12));
    s.onNext(value(13));
    s.onNext(marker());// [V(12), V(13), MARKER]

    s.onNext(value(10));
    s.onNext(value(7));
    s.onNext(marker()); // [V(10), V(7), MARKER]



    s.onNext(value(10));
    s.onNext(value(127));

    s.onComplete(); // Not emitting incomplete messages on the closing of the subject.

关于java - 取出元素直到某个字符并用 RxJava 将它们分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56362602/

相关文章:

java - 将JAVA程序转换为PHP代码

rx-java - 为什么要使用 RxJava 中的新功能 Single?

kotlin - RxJava 如何将 Rxjava 中的 Single<T> 转换为 Retrofit 中的 Call<T> 或相反转换

java - spring hibernate多对多映射表单输入

java - 如何使用 Selenium 下载网页源

android - Kotlin Android - 从 fragment 复制到剪贴板

kotlin - RxJava Single最小执行时间

android - RxJava + Websocket - 如何将 Observable 添加到 Websocket 监听器?

java - 多线程 GAE 应用程序需要同步吗?

android - Ktor:发送响应后删除临时文件