scala - 在 Rx(或 RxJava/RxScala)中,如何制作一个自动重置的有状态锁存器映射/过滤器来测量流中耗时以触及障碍?

标签 scala system.reactive rx-java volatility rx-scala

如果问题措辞不当,我深表歉意,我会尽力而为。

如果我有一个带有时间的值序列 Observable[(U,T)]其中 U 是一个值,T 是一个类似时间的类型(或者我想的任何差异),我怎么能写一个运算符,它是一个自动重置的一键式屏障,当 abs(u_n - u_reset) < barrier 时它是静默的, 但吐出 t_n - t_reset如果屏障被触及,此时它也会重置 u_reset = u_n .

也就是说,这个运算符接收到的第一个值成为基线,它什么都不输出。从今以后,它监视流的值,一旦其中一个值超出基线值(高于或低于),它就会发出耗时(由事件的时间戳测量),并重置基线。然后将处理这些时间以形成波动率的高频估计。

作为引用,我正在尝试编写 http://www.amazon.com/Volatility-Trading-CD-ROM-Wiley/dp/0470181990 中概述的波动率估算器,而不是测量标准偏差(在规则的同质时间的偏差),你重复测量突破某个固定障碍量的障碍所花费的时间。

具体来说,可以使用现有的运算符来编写吗?我对如何重置状态有点困惑,但也许我需要制作两个嵌套运算符,一个是一次性的,另一个是不断创建那个一次性的......我知道这可以通过写作来完成一个手工,但我需要编写我自己的发布者等等。

谢谢!

最佳答案

我不完全理解示例中的算法和你的变量,但你可以使用 flatMap 和一些堆状态并返回 empty()just() 根据需要:

int[] var1 = { 0 };
source.flatMap(v -> {
    var1[0] += v;
    if ((var1[0] & 1) == 0) {
       return Observable.just(v);
    }
    return Observable.empty();
});

如果你因为有多个消费者而需要一个按序列的状态,你可以延迟整个事情:

Observable.defer(() -> {
    int[] var1 = { 0 };
    return source.flatMap(v -> {
        var1[0] += v;
        if ((var1[0] & 1) == 0) {
           return Observable.just(v);
        }
        return Observable.empty();
    });
}).subscribe(...);

关于scala - 在 Rx(或 RxJava/RxScala)中,如何制作一个自动重置的有状态锁存器映射/过滤器来测量流中耗时以触及障碍?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32322783/

相关文章:

scala - 使用Scala泛型: Class[T] does not take parameters

scala - 如何根据行数重新分区 Spark 数据帧?

javascript - RxJs 将流拆分为多个流

c# - 如何在IObservable管道中保留异常并在最后将其重新抛出?

java - java rxObservable 的结果集中没有聚合数据

java - RX Java 2 顺序处理契约(Contract)

java - 除了为绑定(bind)添加语法糖之外,ScalaFX 还有哪些好处?

c# - 订阅 ObservableCollection 项目属性已更改 - WPF

rx-java - RxJava可观察到的: Invoke onError and then retry

eclipse - 如何在sbt项目中声明对Scalding的依赖?