c# - 使用 Reactive Extensions 过滤可观察流中的 "significant changes"

标签 c# system.reactive observable

我有一个 GeoLocationProvider (它实现了 IObservable<System.Device.Location.GeoCoordinate>,每 x 毫秒输出一次当前位置。

现在,我想使用 RX 读取所有这些 GPS 坐标,并且仅在位置变化显着(例如,行驶距离 > 10 米)时才通知订阅者位置变化。

我的“主要”代码

// [...]
IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();
LocationFeed locationFeed = new LocationFeed(locationProvider);

// register any interested observers on the locationFeed.
ConsoleLocationReporter c1 = new ConsoleLocationReporter("reporter0001");
locationFeed.Subscribe(c1);

我的 LocationFeed 实现如下所示:

using System;
using System.Device.Location;
using System.Reactive.Subjects;

namespace My.Namespace.Movement
{
    public class LocationFeed : ISubject<GeoCoordinate>, IDisposable
    {
        private readonly IDisposable _subscription;
        private readonly Subject<GeoCoordinate> _subject;

        public LocationFeed(IObservable<GeoCoordinate> observableSource)
        {
            _subject = new Subject<GeoCoordinate>();
            _subscription = observableSource.Subscribe(_subject); // TODO: Add logic to filter to only significant movement changes (> 10m)
        }

        public void Dispose()
        {
            _subscription?.Dispose();
            _subject?.Dispose();
        }

        public void OnNext(GeoCoordinate value)
        {
            _subject.OnNext(value);
        }

        public void OnError(Exception error)
        {
            _subject.OnError(error);
        }

        public void OnCompleted()
        {
            _subject.OnCompleted();
        }

        public IDisposable Subscribe(IObserver<GeoCoordinate> observer)
        {
            return _subject.Subscribe(observer);
        }
    }
}

问题一:GeoCoordinate提供了一个方法c1.DistanceTo(c2)计算两个坐标之间的距离。如果阈值与最后一个推送的相比大于x,我只想报告(发布)新的地理坐标。我该如何实现?

问题 2:subject 的使用是否正确以及我实现 ISubject 的方式?我不想在我的“主”代码中添加所有连线并将其全部移动到一个单独的类中。

最佳答案

我强烈建议不要实现 ISubject<T> (或者就此而言 IObservable<T>IObserver<T> )。相反,尝试组合现有的工厂和类型,然后将它们公开为“具有”关系而不是"is"关系。

如您所见,您的 LocationFeed纯粹是 observableSource 的包装器参数,所以似乎没有解决任何问题。我建议删除它。

关于您发布的问题,一种解决方案是使用大小为 2 且步长为 1 的缓冲区。

IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();

locationProvider
    .Buffer(2,1)
    .Where(buffer=>buffer[0].DistanceTo(buffer[1]) > 10)
    .Select(buffer=>buffer[1])
    .Subscribe(
        pos => Console.WriteLine(pos),
        ex => { },
        () => {});

或者你可以使用 Scan

IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();

locationProvider
    .Scan(Tuple.Create(GeoCoordinate.Zero,GeoCoordinate.Zero), (acc, cur)=>Tuple.Create(acc.Item2, cur))
    .Where(pair=>pair.Item1.DistanceTo(pair.Item2) > 10)
    .Select(pair=>pair.Item2)
    .Subscribe(
        pos => Console.WriteLine(pos),
        ex => { },
        () => {});

我不确定您对产生的第一个值(value)有什么要求。发表还是不发表?

编辑: 这是一个经过测试的解决方案(使用 Point 类型),它将推送 Unit当发生“重大”变化时。如果这不是您想要的,它应该足以让您修改以获得您真正想要的东西

void Main()
{
    var zero = new System.Drawing.Point(0,0);
    var fenceDistance = 10;

    var scheduler = new TestScheduler();
    var source = scheduler.CreateColdObservable(
        ReactiveTest.OnNext(1, new System.Drawing.Point(0,0)),
        ReactiveTest.OnNext(2, new System.Drawing.Point(0,9)),  //Not far enough
        ReactiveTest.OnNext(3, new System.Drawing.Point(0,10)), //Touches the fence
        ReactiveTest.OnNext(4, new System.Drawing.Point(0,15)), //Not far enough
        ReactiveTest.OnNext(5, new System.Drawing.Point(0,40))  //Breaches the fence        
        );

    var observer = scheduler.CreateObserver<Unit>();

    source
        .Scan(Tuple.Create(zero, zero), (acc, cur) =>
        {
            if (DistanceBetween(acc.Item1, cur) >= fenceDistance)
            {
                return Tuple.Create(cur, cur);
            }
            else
            {
                return Tuple.Create(acc.Item1, cur);
            }
        })
        .Where(pair => pair.Item1 == pair.Item2)
        .Select(pair => Unit.Default)
        .Subscribe(observer);


    scheduler.Start();

    ReactiveAssert.AreElementsEqual(new[] {
        ReactiveTest.OnNext(1, Unit.Default),
        ReactiveTest.OnNext(3, Unit.Default),
        ReactiveTest.OnNext(5, Unit.Default)
    },observer.Messages);

}

// Define other methods and classes here
public static double DistanceBetween(System.Drawing.Point a, System.Drawing.Point b)
{
    var xDelta = a.X -b.X;
    var yDelta = a.Y - b.Y;

    var distanceSqr = (xDelta * xDelta) + (yDelta * yDelta);
    return Math.Sqrt(distanceSqr);
}

关于c# - 使用 Reactive Extensions 过滤可观察流中的 "significant changes",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39356211/

相关文章:

angular - 可以从异步管道调用类方法吗?

c# - 当字符串字段值为空格时 XMLSerializer 中的问题

c# - 在移动服务数据库上启用代码优先迁移时出错

c# - "Exception of type ' System.Exception '' was thrown"的写法从何而来?

c# - Rx 运算符到不同的序列

Android Observables.zip 列出可观察对象

c# - 通用列表及对应方法

c# - 合并从多个实例触发的事件的可观察序列的正确方法

c# - 如何将以下类转换为 IObservable?

angular - RxJs:forkJoin() 未运行,因为我的可观察列表不完整