rx-java - 将 Observables 与反馈合并,将 Observable 与其自身合并

标签 rx-java system.reactive reactive-programming rx-android

我需要创建Observable,它将从不同的来源(其他Observables)收集信息,每个来源都会对事件值产生影响,但该值仍然是基于先前的值(一种状态机)构建的。

我们有带有 int 值和操作代码的消息:

class Message{
    Integer value;
    String operation;

    public Message(Integer value, String operation) {
        this.value = value;
        this.operation = operation;
    }
}

以及此类值的一些来源,以及初始值:

Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));

现在有事件源了。这些源将发出值,新的dynamicMessage值将根据该值以及先前的dynamicMessage值而出现。实际上它的事件类型是:

class Changer1 {
    String operation;

    public Changer1(String operation) {
        this.operation = operation;
    }
}


class Changer2 {
    Integer value;

    public Changer2(Integer value) {
        this.value = value;
    }
}

Changer1 负责更改操作。 Changer2 负责改变值。

以及这些值的来源:

static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
        .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
                .delay(2, TimeUnit.SECONDS));


static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
        .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
                .delay(2, TimeUnit.SECONDS));

现在我需要更改dynamicMessage Observable 并接收来自更改者的消息。这是一个图表: state machine

我还尝试编写一个程序,我如何看待解决方案,但它因 OutOfMemoryException 挂起,就在第一个值之后:
消息{value=1, 操作='+'}
列表:

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

public class RxDilemma {


static class Message{
    Integer value;
    String operation;

    public Message(Integer value, String operation) {
        this.value = value;
        this.operation = operation;
    }

    @Override
    public String toString() {
        return "Message{" +
                "value=" + value +
                ", operation='" + operation + '\'' +
                '}';
    }
}

static class Changer1 {
    String operation;

    public Changer1(String operation) {
        this.operation = operation;
    }

    @Override
    public String toString() {
        return "Changer1{" +
                "operation='" + operation + '\'' +
                '}';
    }
}


static class Changer2 {
    Integer value;

    public Changer2(Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Changer2{" +
                "value=" + value +
                '}';
    }
}





static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
        .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
                .delay(2, TimeUnit.SECONDS));


static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
        .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
                .delay(2, TimeUnit.SECONDS));


static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
    @Override
    public Observable<Message> call(final Changer1 changer) {
        return dynamicMessage.last().map(new Func1<Message, Message>() {
            @Override
            public Message call(Message message) {
                message.operation = changer.operation;
                return message;
            }
        });
    }
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
    @Override
    public Observable<Message> call(final Changer2 changer2) {
        return dynamicMessage.last().map(new Func1<Message, Message>() {
            @Override
            public Message call(Message message) {
                if("+".equalsIgnoreCase(message.operation)){
                    message.value = message.value+changer2.value;
                } else if("-".equalsIgnoreCase(message.operation)){
                    message.value = message.value-changer2.value;
                }
                return message;
            }
        });
    }
}));

public static void main(String[] args) throws InterruptedException {

    dynamicMessage.subscribe(new Action1<Message>() {
        @Override
        public void call(Message message) {
            System.out.println(message);
        }
    });


    Thread.sleep(5000000);
}
}

所以我的预期结果是:

消息{value=1, 操作='+'}
消息{value=1, 操作='-'}
消息{value=-1, 操作='-'}
消息{value=1, 操作='+'}
消息{value=2,操作='+'}

我还考虑过用CombineLatest合并所有内容,但后来我发现CombineLatest没有提供更改了哪一个元素,如果没有这些信息,我将不知道需要对消息进行哪些更改。 有什么帮助吗?

最佳答案

换句话说,您希望减少扫描您的操作反射(reflect)状态的变化。有一个名为 reduce scan 的运算符,它完全可以满足您的要求。它接受每个发生的事件,并让您通过添加该转换的先前结果来转换它,这将是您的状态。

interface Changer {
    State apply(State state);
}

class ValueChanger implements Changer {
    ...
    State apply(State state){
        // implement your logic of changing state by this action and return a new one
    }
    ...
}


class OperationChanger implements Changer {
    ...
    State apply(State state){
        // implement your logic of changing state by this action and return a new one
    }
    ...
}

Observable<ValueChanger> valueChangers
Observable<OperationChanger> operationChangers

Observable.merge(valueChangers, operationChangers)
    .scan(initialState, (state, changer) -> changer.apply(state))
    .subscribe(state -> {
        // called every time a change happens
    });

请注意,我稍微更改了您的命名,以便更有意义。另外,我在这里使用 Java8 lambda 表达式。如果您无法在项目中使用它们,只需将它们替换为匿名类即可。

编辑:使用scan运算符而不是reduce,因为它会沿途提供每个结果,而不是仅发出最终结果

关于rx-java - 将 Observables 与反馈合并,将 Observable 与其自身合并,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42036527/

相关文章:

java - 如何在RxJava2中正确调用Single/Maybe,是否需要subscribeOn和observeOn和flatMap?

c# - Rx 运算符到不同的序列

c# - 当返回类型不重要时,是否有更优雅的方法来合并可观察对象?

system.reactive - 合并多个流,保持顺序并避免重复

javascript - 如何使用 React 创建 “twitter like” 剩余字符数

java - 不使用rxjava如何去抖

java - 使用 Roboguice 和 RxJava 进行继承和依赖注入(inject)

java - 在 RxJava 中取消订阅线程安全吗?

reactive-programming - 主线程上的 Flux/Publisher

java - 如何在响应式 java 中从 Mono<String> 获取字符串