我是 java RX 的新手,我遇到了一个问题,希望有人可以告诉我我做错了什么。
问题: 我正在跟踪很多事件,此类事件的触发方式如下:
Observable<Long> otherObservable = Observable.empty();
public void myMethod(){
Observable<Long> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS);
final Subscriber<Long> timeSubscriber = new Subscriber<Long>() {
@Override
public void onCompleted() {
// nothing really
}
@Override
public void onError(final Throwable throwable) {
// nothing really
}
@Override
public void onNext(final Long number) {
// Here i do something
}
};
return Observable.merge(timerObservable, otherObservable)
.first()
.subscribe(timeSubscriber);
}
所以基本上它会在 VARIABLE_TIME 之后触发一个事件。
效果很好,但现在我面临着我有太多事件的事实。
所以我考虑使用去抖和缓冲。
我想做的是:
仍然创建许多在 N 秒后发出事件的可观察对象。
从他们每个人那里收集信息(一个长的或可能是一个字符串)
经过一段延迟时间(缓冲时间)后,将包含所有收集到的信息的列表发送给订阅者。
到目前为止我已经这样做了:
Observable<List<Long>> otherObservable = Observable.empty();
otherObservable.debounce(10L, SECONDS).buffer(20L, SECONDS);
Observable<List<Long>> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS).buffer(1);
Subscriber<List<Long> > observerSuscriber = new Subscriber<List<Long>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(final Throwable throwable) {
}
@Override
public void onNext(final List<Long> ids ) {
// do something here
}
};
Observable.merge(otherObservable, observable1)
.first()
.subscribe(observerSuscriber);
但是像这样我仍然在发出后立即收到消息。
我想知道是否有办法做到这一点?有任何想法吗?我正在使用 java RX 1.2
最佳答案
After a delay time (buffer time) Send a list with all the collected info to the subscriber.
你在那里回答了你自己的问题!使用缓冲区
运算符:
Flowable<String> stream = ...
Flowable<List<String>> lists = stream.buffer(5, TimeUnit.SECONDS);
关于java - Java RX 中可通过缓冲区和多个值更新进行观察?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50223681/