我已经实现了简单的 RxEventBus,即使没有订阅者,它也会开始发出事件。我想缓存最后发出的事件,以便如果第一个/下一个订阅者订阅,它只会收到一个(最后一个)项目。
我创建了描述我的问题的测试类:
public class RxBus {
ApplicationsRxEventBus applicationsRxEventBus;
public RxBus() {
applicationsRxEventBus = new ApplicationsRxEventBus();
}
public static void main(String[] args) {
RxBus rxBus = new RxBus();
rxBus.start();
}
private void start() {
ExecutorService executorService = Executors.newScheduledThreadPool(2);
Runnable runnable0 = () -> {
while (true) {
long currentTime = System.currentTimeMillis();
System.out.println("emiting: " + currentTime);
applicationsRxEventBus.emit(new ApplicationsEvent(currentTime));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable runnable1 = () -> applicationsRxEventBus
.getBus()
.subscribe(new Subscriber<ApplicationsEvent>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(ApplicationsEvent applicationsEvent) {
System.out.println("runnable 1: " + applicationsEvent.number);
}
});
Runnable runnable2 = () -> applicationsRxEventBus
.getBus()
.subscribe(new Subscriber<ApplicationsEvent>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(ApplicationsEvent applicationsEvent) {
System.out.println("runnable 2: " + applicationsEvent.number);
}
});
executorService.execute(runnable0);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(runnable1);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(runnable2);
}
private class ApplicationsRxEventBus {
private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
private final Observable<ApplicationsEvent> mBusObservable;
public ApplicationsRxEventBus() {
mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
mBusObservable = mRxBus.cache();
}
public void emit(ApplicationsEvent event) {
mRxBus.onNext(event);
}
public Observable<ApplicationsEvent> getBus() {
return mBusObservable;
}
}
private class ApplicationsEvent {
long number;
public ApplicationsEvent(long number) {
this.number = number;
}
}
}
即使没有订阅者,runnable0 也会发出事件。 runnable1 在 3 秒后订阅,并收到最后一个项目(这是可以的)。但是 runnable2 在 runnable1 之后 3 秒后订阅,并接收 runnable1 接收到的所有项目。我只需要为 runnable2 接收最后一个项目。我尝试过 RxBus 中的缓存事件:
private class ApplicationsRxEventBus {
private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
private final Observable<ApplicationsEvent> mBusObservable;
private ApplicationsEvent event;
public ApplicationsRxEventBus() {
mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
mBusObservable = mRxBus;
}
public void emit(ApplicationsEvent event) {
this.event = event;
mRxBus.onNext(event);
}
public Observable<ApplicationsEvent> getBus() {
return mBusObservable.doOnSubscribe(() -> emit(event));
}
}
但问题是,当 runnable2 订阅时,runnable1 接收事件两次:
emiting: 1447183225122
runnable 1: 1447183225122
runnable 1: 1447183225122
runnable 2: 1447183225122
emiting: 1447183225627
runnable 1: 1447183225627
runnable 2: 1447183225627
我确信,有 RxJava 运算符可以实现此目的。如何实现这一目标?
最佳答案
除了所有缓存的事件之外,您的 ApplicationsRxEventBus
还可以通过每当一个订阅时重新发送存储的事件来完成额外的工作。
您只需要一个 BehaviorSubject
+ toSerialized
,因为它将保留最后一个事件并自行将其重新发送给订阅者。
关于caching - RxJava 为 future 订阅者缓存最后一项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33637968/