如果问题措辞不当,我深表歉意,我会尽力而为。
如果我有一个带有时间的值序列 Observable[(U,T)]
其中 U 是一个值,T 是一个类似时间的类型(或者我想的任何差异),我怎么能写一个运算符,它是一个自动重置的一键式屏障,当 abs(u_n - u_reset) < barrier
时它是静默的, 但吐出 t_n - t_reset
如果屏障被触及,此时它也会重置 u_reset = u_n
.
也就是说,这个运算符接收到的第一个值成为基线,它什么都不输出。从今以后,它监视流的值,一旦其中一个值超出基线值(高于或低于),它就会发出耗时(由事件的时间戳测量),并重置基线。然后将处理这些时间以形成波动率的高频估计。
作为引用,我正在尝试编写 http://www.amazon.com/Volatility-Trading-CD-ROM-Wiley/dp/0470181990 中概述的波动率估算器,而不是测量标准偏差(在规则的同质时间的偏差),你重复测量突破某个固定障碍量的障碍所花费的时间。
具体来说,可以使用现有的运算符来编写吗?我对如何重置状态有点困惑,但也许我需要制作两个嵌套运算符,一个是一次性的,另一个是不断创建那个一次性的......我知道这可以通过写作来完成一个手工,但我需要编写我自己的发布者等等。
谢谢!
最佳答案
我不完全理解示例中的算法和你的变量,但你可以使用 flatMap
和一些堆状态并返回 empty()
或 just()
根据需要:
int[] var1 = { 0 };
source.flatMap(v -> {
var1[0] += v;
if ((var1[0] & 1) == 0) {
return Observable.just(v);
}
return Observable.empty();
});
如果你因为有多个消费者而需要一个按序列的状态,你可以延迟
整个事情:
Observable.defer(() -> {
int[] var1 = { 0 };
return source.flatMap(v -> {
var1[0] += v;
if ((var1[0] & 1) == 0) {
return Observable.just(v);
}
return Observable.empty();
});
}).subscribe(...);
关于scala - 在 Rx(或 RxJava/RxScala)中,如何制作一个自动重置的有状态锁存器映射/过滤器来测量流中耗时以触及障碍?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32322783/