假设我有一个可观察的,它发出 2D 点,表示某些路线的转折点
public static double[][] points = new double[][]{
{0, 0},
{0, 10},
{1, 10},
{1, 0},
{0, 0}
};
public static Observable<double[]> routePoints() {
return Observable
.from(points);
}
这意味着机器人从原点(0,0)
开始,然后走到矩形的左下角,然后右下角,然后返回,最后回到原点。
假设现在我想要一个可观察的对象,它提供该点,就好像机器人以恒定速度移动一样,例如每秒 1 个单位。
因此,第一个运算符(operator)接收 (0,0)
并按原样重新传输。
然后它接收到 (0, 10)
并看到,到该点的距离是 10
个单位,并且应该需要 10
秒才能到达该点。因此,新的可观察量应该发出
(0, 1)
(0, 2)
(0, 3)
(0, 4)
(0, 5)
(0, 6)
(0, 7)
(0, 8)
(0, 9)
(0, 10)
每秒一对,直到到达最后一个接收点并重新传输。然后运算符(operator)应该采取下一个点并对其执行相同的操作。
如何使用 ReactiveX 来实现这一点?
我怀疑我们应该在这里实现“背压”来减慢可观察源的速度,直到随后的源及时重新发出所有内容?
更新
我编写了下面的代码并且它有效。它使用两个 flatMap
。一种是提供额外的点,并为每个点附上时间戳(以秒为单位),另一种 flatMap
引入适合时间戳的延迟。
代码可以工作,但看起来很复杂。问题仍然存在:ReactiveX 库是否包含用于此类事情的现成工具:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class EmitByTime {
public static double speed = 1;
public static double[][] points = new double[][]{
{0, 0},
{0, 10},
{1, 10},
{1, 0},
{0, 0}
};
public static Observable<double[]> routePoints() {
return Observable
.from(points);
}
public static Observable<double[]> routeFeeder() {
return routePoints()
.flatMap(new Func1<double[], Observable<double[]>>() {
double[] previous = null;
@Override
public Observable<double[]> call(double[] doubles) {
if( previous == null) {
previous = new double[] {doubles[0], doubles[1], 0};
return Observable.just(previous);
}
else {
double dx = doubles[0] - previous[0];
double dy = doubles[1] - previous[1];
double dist = Math.sqrt( dx*dx + dy*dy );
dx /= dist;
dy /= dist;
double vx = dx * speed;
double vy = dy * speed;
double period = dist / speed;
double end = previous[2] + period;
double start = Math.ceil(previous[2] + 1);
ArrayList<double[]> sequence = new ArrayList<>();
for(double t = start; t < end; t+=1) {
double tt = (t - previous[2]);
double x = previous[0] + tt * vx;
double y = previous[1] + tt * vy;
sequence.add(new double[] {x, y, t});
}
previous = new double[] {doubles[0], doubles[1], end};
sequence.add(previous);
return Observable.from(sequence);
}
}
})
.flatMap(new Func1<double[], Observable<double[]>>() {
long origin = System.currentTimeMillis();
@Override
public Observable<double[]> call(double[] doubles) {
long now = System.currentTimeMillis();
long due = Math.round(doubles[2]*1000) + origin;
if( due <= now ) {
return Observable.just(doubles);
}
else {
return Observable.just(doubles).delay(due-now, TimeUnit.MILLISECONDS);
}
}
})
;
}
public static void main(String[] args) throws InterruptedException {
routeFeeder().subscribe(new Action1<double[]>() {
@Override
public void call(double[] doubles) {
System.out.println(Arrays.toString(doubles));
}
});
Thread.sleep(20000);
}
}
最佳答案
这感觉主要像是一个寻路问题。
我可能有一种方法可以将您的一组航路点转换为穿过所有航路点的路线(这是路径查找位)。
类似于:
public Observable<Point> calculateRoute(Observable<Point> waypoints);
其中航路点是您上面提到的“转折点”;
然后,您可以将 Observable.interval
流与您的 Observable
路由一起zip
,该流将每秒发出一次每个路由。
所以也许是这样的:
public Observable<Point> emitRoutePerSecond(Observable<Point> wayPoints) {
return Observable.interval(1, TimeUnit.SECONDS)
.zipWith(calculateRoute(wayPoints), (i, point) -> point);
}
关于java - 如何在正确(实时)的时间内传递可观察的元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42068076/