Java Rx - 过滤定时事件

标签 java reactive-programming rx-java

假设我的可观察值发出整数。如果在过去 30 秒内 x 不是由 observable 生成的,我希望我的观察者触发整数 x。

行为与谴责类似,但相反。

最佳答案

也许这不是更优雅和简洁的解决方案,但我认为您可以使用如下所示的过滤器

这个解决方案并不完美,因为 : 由发射器触发,因此要按预期工作,您必须在 TIME 之后发出一个假事件来刷新最后一个

private static class TimedFilter<T> implements   Func1<T, Collection<T>>{

    private NavigableMap<Long,T> treeMap = new TreeMap<>();
    private HashMap<T,Long> indexes = new HashMap<>();
    private final long delta_millis;

    public TimedFilter(long delta_millis) {
        this.delta_millis = delta_millis;
    }

    @Override
    public Collection<T> call(T x) {

        long now_millis = System.currentTimeMillis();

        Long oldIndex = indexes.put(x, now_millis);

        if(oldIndex!=null)
            treeMap.remove(oldIndex); // throws NPE - if the specified key is null and this map uses natural ordering

        treeMap.put(now_millis,x);

        long then_millis = now_millis - delta_millis;
        Collection<T> values = treeMap.headMap(then_millis).values();
        for (T v : values){
            indexes.remove(v);
        }

        treeMap = treeMap.tailMap(then_millis, true);
        return values;
    }
}

测试

void testFunction(){
    getEmitter()
            .map(new TimedFilter<>(TIME))
            .flatMap(Observable::from)
            .subscribe(System.out::print)
    ;
}

更新: @tilois 建议 System.nanoTime() 更可靠,如果您运行 Android 也可以使用 SystemClock.elapsedRealtime() code> 返回毫秒。 ;

关于Java Rx - 过滤定时事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33472633/

相关文章:

php - 在阻塞应用程序中使用响应式 PHP

java - 从项目 react 器中的通量中采样除第一个元素之外的所有元素

java - 取出元素直到某个字符并用 RxJava 将它们分组

multithreading - 意外的异步行为 : Springs's @Async vs RxJava

java - AspectJ:避免执行枚举

java - 使用java异常进行条件检查

java - 以 rx 方式与 Jersey 客户端合作

android - 使用 rxbinding 时我应该取消订阅吗?

java - 数组算法及其时间复杂度分析

java - OrientDB:事务处于 Activity 状态时无法更改架构。架构更改不是事务性的