java - 如何动态缩放突发发射流的去抖?

标签 java kotlin rx-java2 rx-kotlin

我有一个缓冲流,在发布已缓冲的元素列表之前等待预定的静默时间:

INTEGERS
     .share()
     .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler))
     .map { durations ->
       ... 
     }

我想让 DEBOUNCE_TIME 根据缓冲项目的平均值动态调整,但我很难弄清楚如何实现这一点。

最佳答案

您可以推迟去抖,取出其中一项,并在确定新的去抖值后触发重复:

int DEBOUNCE_TIME = 100;
AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME);
PublishSubject<Integer> mayRepeat = PublishSubject.create();

AtomicInteger counter = new AtomicInteger();

Observable<Integer> INTEGERS =
        Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS)
                .map(w -> counter.incrementAndGet()));

INTEGERS.publish(o ->
        o.buffer(
            Observable.defer(() ->
                o.debounce(
                    debounceTime.get(), TimeUnit.MILLISECONDS)
            )
            .take(1)
            .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b))
        )
    )
    .map(list -> {
        int nextDebounce = Math.min(100, list.size() * 100);
        debounceTime.set(nextDebounce);
        mayRepeat.onNext(1);
        return list;
    })
    .blockingSubscribe(System.out::println);

打印:

[1, 2]
[3, 4, 5]
[6, 7, 8, 9]
[10]

关于java - 如何动态缩放突发发射流的去抖?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45380773/

相关文章:

android - Kotlin 协程不等待完成

android - RxJava2 map/flatMap 与 flatMapIterable

java - RxJava : merging observables on different threads

java - 当我按下按钮但没有所有值时,如何从文本字段添加值

java - 银行识别码 validator

java - 如何在多模块android项目(kotlin + java)上配置SonarQube(使用jacoco)?

java - Mockk - 模拟实现多个接口(interface)的最终类​​时出现 ClassCastException

java - RxJava : How to dynamically emit items from an HTTP response

java - Hibernate @AttributeOverrides 多个属性而不显式命名每个属性

用于测试目的的 Java JRE 内置图像