caching - RxJava 为 future 订阅者缓存最后一项

标签 caching rx-java event-bus

我已经实现了简单的 RxEventBus,即使没有订阅者,它也会开始发出事件。我想缓存最后发出的事件,以便如果第一个/下一个订阅者订阅,它只会收到一个(最后一个)项目。

我创建了描述我的问题的测试类:

public class RxBus {

ApplicationsRxEventBus applicationsRxEventBus;

public RxBus() {
    applicationsRxEventBus = new ApplicationsRxEventBus();
}

public static void main(String[] args) {
    RxBus rxBus = new RxBus();
    rxBus.start();
}

private void start() {
    ExecutorService executorService = Executors.newScheduledThreadPool(2);

    Runnable runnable0 = () -> {
        while (true) {
            long currentTime = System.currentTimeMillis();
            System.out.println("emiting: " + currentTime);
            applicationsRxEventBus.emit(new ApplicationsEvent(currentTime));
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };

    Runnable runnable1 = () -> applicationsRxEventBus
            .getBus()
            .subscribe(new Subscriber<ApplicationsEvent>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(ApplicationsEvent applicationsEvent) {
                    System.out.println("runnable 1: " + applicationsEvent.number);
                }
            });

    Runnable runnable2 = () -> applicationsRxEventBus
            .getBus()
            .subscribe(new Subscriber<ApplicationsEvent>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(ApplicationsEvent applicationsEvent) {
                    System.out.println("runnable 2: " + applicationsEvent.number);
                }
            });


    executorService.execute(runnable0);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executorService.execute(runnable1);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executorService.execute(runnable2);
}

private class ApplicationsRxEventBus {
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
    private final Observable<ApplicationsEvent> mBusObservable;

    public ApplicationsRxEventBus() {
        mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
        mBusObservable = mRxBus.cache();
    }

    public void emit(ApplicationsEvent event) {
        mRxBus.onNext(event);
    }

    public Observable<ApplicationsEvent> getBus() {
        return mBusObservable;
    }
}
private class ApplicationsEvent {
    long number;

    public ApplicationsEvent(long number) {
        this.number = number;
    }
}
}

即使没有订阅者,runnable0 也会发出事件。 runnable1 在 3 秒后订阅,并收到最后一个项目(这是可以的)。但是 runnable2 在 runnable1 之后 3 秒后订阅,并接收 runnable1 接收到的所有项目。我只需要为 runnable2 接收最后一个项目。我尝试过 RxBus 中的缓存事件:

private class ApplicationsRxEventBus {
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
    private final Observable<ApplicationsEvent> mBusObservable;

    private ApplicationsEvent event;

    public ApplicationsRxEventBus() {
        mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
        mBusObservable = mRxBus;
    }

    public void emit(ApplicationsEvent event) {
        this.event = event;
        mRxBus.onNext(event);
    }

    public Observable<ApplicationsEvent> getBus() {
        return mBusObservable.doOnSubscribe(() -> emit(event));
    }
}

但问题是,当 runnable2 订阅时,runnable1 接收事件两次:

emiting: 1447183225122
runnable 1: 1447183225122
runnable 1: 1447183225122
runnable 2: 1447183225122
emiting: 1447183225627
runnable 1: 1447183225627
runnable 2: 1447183225627

我确信,有 RxJava 运算符可以实现此目的。如何实现这一目标?

最佳答案

除了所有缓存的事件之外,您的 ApplicationsRxEventBus 还可以通过每当一个订阅时重新发送存储的事件来完成额外的工作。

您只需要一个 BehaviorSubject + toSerialized,因为它将保留最后一个事件并自行将其重新发送给订阅者。

关于caching - RxJava 为 future 订阅者缓存最后一项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33637968/

相关文章:

spring - 使用Spring-Reactor针对不同JVM进行异步事件驱动编程

javascript - 要求 js 缓存设置 urlArgs 半身像的设置文件

java - HIbernate 查询缓存将结果缓存为 null

java - 如何保证RXJava Observable指令在线程范围内执行?

android - 在 View 尚未开始时使用总线?

java - Guava 事件总线 : NullPointerException when Posting from Thread to UI

通过选择自定义数据格式进行 MySQL 复制

asp.net - 如何在非 ASP.NET 应用程序中使用 ASP.NET 缓存对象?

android - Flowable 的两个序列没有到达 toList()

java - 合并两个 Observables/Singles 的方法有哪些?