java - Java RX 中可通过缓冲区和多个值更新进行观察?

标签 java rx-java

我是 java RX 的新手,我遇到了一个问题,希望有人可以告诉我我做错了什么。

问题: 我正在跟踪很多事件,此类事件的触发方式如下:

Observable<Long> otherObservable = Observable.empty();

public void myMethod(){
Observable<Long> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS);
        final Subscriber<Long> timeSubscriber = new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                // nothing really
            }

            @Override
            public void onError(final Throwable throwable) {
                // nothing really
            }

            @Override
            public void onNext(final Long number) {
                // Here i do something
            }
        };

        return Observable.merge(timerObservable, otherObservable) 
                .first()
                .subscribe(timeSubscriber);
}

所以基本上它会在 VARIABLE_TIME 之后触发一个事件。

效果很好,但现在我面临着我有太多事件的事实。

所以我考虑使用去抖和缓冲。

我想做的是:

仍然创建许多在 N 秒后发出事件的可观察对象。

从他们每个人那里收集信息(一个长的或可能是一个字符串)

经过一段延迟时间(缓冲时间)后,将包含所有收集到的信息的列表发送给订阅者。

到目前为止我已经这样做了:

Observable<List<Long>> otherObservable = Observable.empty();

otherObservable.debounce(10L, SECONDS).buffer(20L, SECONDS);
Observable<List<Long>> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS).buffer(1);

Subscriber<List<Long> > observerSuscriber = new Subscriber<List<Long>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(final Throwable throwable) {

            }

            @Override
            public void onNext(final List<Long> ids ) {
             // do something here
            }
        };

Observable.merge(otherObservable, observable1)
                  .first()
                  .subscribe(observerSuscriber);

但是像这样我仍然在发出后立即收到消息。

我想知道是否有办法做到这一点?有任何想法吗?我正在使用 java RX 1.2

最佳答案

After a delay time (buffer time) Send a list with all the collected info to the subscriber.

你在那里回答了你自己的问题!使用缓冲区运算符:

Flowable<String> stream = ...
Flowable<List<String>> lists = stream.buffer(5, TimeUnit.SECONDS);

关于java - Java RX 中可通过缓冲区和多个值更新进行观察?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50223681/

相关文章:

java - Hibernate 字节码增强单向 ManyToOne

java - 如何修复 JSR-303 验证和 orientdb 的兼容性问题

java - 在Java中,为什么有的变量需要先初始化,有的只需要声明?

java - RxJava : Observing messages emitted from a socket

android - 使用 RxJava 从 2 个可观察对象中获取一个结果

java - 使用 Observables RxJava 进行文件验证

rx-java - 如何暂停事件通过可观察对象流动?

java - 线程中的异常 "main"java.util.IllegalFormatConversionException : f ! = java.lang.Integer

android - 使用 Observable Zip 行为不当

java - 如何检查在 YourKit 中记录对象分配期间创建了多少个类的实例?