假设我们有一些从传感器读取的样本缓冲区。每次我在缓冲区上运行一个方法(即 buffer.fetchNewSamples()
)时,都会向缓冲区提供新的样本。我如何从这样的对象创建Java Rx Observable,该对象将从缓冲区中一一返回每个样本,并在发出最后一个样本时调用 buffer.fetchNewSamples() ,然后以相同的方式继续? p>
我刚刚开始使用 Rx,虽然创建一个返回(即此类缓冲区的平均值)的可观察值非常容易,但我不知道如何在不创建新线程并进行一些同步的情况下创建上述内容。 .
最佳答案
给定一个传感 API float[]measure()
,您可以通过以下顺序进行定时测量和发射:
Observable.interval(10, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.map(t -> measure())
.concatMap(fa -> Observable.range(0, fa.length).map(i -> fa[i]))
.subscribe(...)
说明
给定 10 毫秒的间隔,丢弃任何未请求的内容,并将计时器值映射到测量值数组(每 10 毫秒发生一次)。给定一个接一个发出的测量值数组,通过相当于索引的 for 循环将其展平为数组元素,但要确保下一个测量值仅在前一个测量值完成发出数组元素之后出现。最后,你会得到一个 float 流。
如果您想在上次调用的所有测量结果消耗完毕后立即进行测量,您可以执行以下操作:
BehaviorSubject<Integer> bs = BehaviorSubject.create(1);
bs.observeOn(Schedulers.trampoline())
.map(t -> measure())
.concatMap(fa ->
Observable.range(0, fa.length).map(i -> fa[i])
.finallyDo(() -> bs.onNext(1))
)
说明 我们利用BehaviorSubject 的行为将其存储的值发送给第一个订阅者,我们将使用它来触发测量。为了避免无限递归,我们将在蹦床调度器上观察该主题。一旦信号通过,我们就进行测量并将其转换为如上所述的 float 序列。唯一的区别是,每当这样的子序列完成时,它都会向BehaviorSubject发出信号以触发另一个测量。
关于java - 如何从一组样本创建 Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33673771/