c# - 使用 Rx 执行延迟采样的最佳方法是什么?

标签 c# system.reactive reactive-programming

我正在开发一个 Xamarin 应用程序,我在其中扩展了 Connectivity使用 Rx 而不是事件的插件。

我的目标是在重新建立连接时引入轻微的延迟,以便网络适配器有时间连接到 Internet(UWP 的解决方法)。如果在此延迟内出现任何值,则只需要保留最后一个值,因为只有当前连接状态才重要。

这工作正常,但感觉有点 hacky:

internal static class ConnectivityExtensions
{
    public static IObservable<bool> ToObservable(this IConnectivity @this)
    {
        var connectivity = Observable
            .FromEventPattern<ConnectivityChangedEventHandler, ConnectivityChangedEventArgs>(
                handler => @this.ConnectivityChanged += handler,
                handler => @this.ConnectivityChanged -= handler)
            .Select(args => args.EventArgs.IsConnected);

        var sampling = connectivity
            .Timestamp()
            .Select(ts => new
            {
                ts.Value,
                ts.Timestamp,
                // If reconnection, delay subscriber notification for 250ms
                DelayUntil = ts.Value ? (DateTimeOffset?)ts.Timestamp.Add(TimeSpan.FromMilliseconds(250)) : null
            })
            .Scan((acc, current) => new
            {
                current.Value,
                current.Timestamp,
                // If current notification is during reconnection notification delay period, delay the current notification too
                DelayUntil = current.Timestamp < acc.DelayUntil ? acc.DelayUntil : current.DelayUntil
            })
            // Perform reconnection delay
            .Delay(x => x.DelayUntil.HasValue
                ? Observable.Return(x.DelayUntil.Value).Delay(x.DelayUntil.Value)
                : Observable.Empty<DateTimeOffset>())
            // All delayed notifications are delayed until the same time, so we only need one notification to trigger sampling after delay
            .DistinctUntilChanged()
            .Select(_ => Unit.Default);

        return connectivity
            .Sample(sampling)
            .StartWith(@this.IsConnected)
            .DistinctUntilChanged()
            .Replay(1)
            .RefCount();
    }
}

示例输出:

通过这个示例,您有望看到我正在做的事情的好处。延迟会阻止订阅者在“连接”发生变化但尚未建立互联网连接 (UWP) 时处理数据。此外,这还可以保护订阅者免受任何快速“开/关”通知的影响。

                                                                                     // StartsWith: True
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: True, Ignored due to DistinctUntilChanged
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Sample: False
Thread.Sleep(TimeSpan.FromMilliseconds(250));
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delayed by previous, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: False, Ignored due to DistinctUntilChanged
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: False, Ignored due to DistinctUntilChanged

// Final Output:
// True
// False

是否有更优化的方式来实现这种类型的延迟采样?

最佳答案

如果我理解正确的话,这里有一个关于事情如何发展的大理石图:

T (millis)           : 0----250--500--1000-1250-1500-1750-2000
Connectivity         : ---F-------T---F----T-------F--T-------
ScanValueDelayUntil  : ---null----800-null-1500----null2100---
Sampling             : -------------x-----------x-----------x-
ResultSampled        : T------------T-----------T-----------T-
ResultSampledDistinct: T--------------------------------------

这似乎意义不大。你能描述一下你希望结果代表什么,或者更正我的错误吗?

关于c# - 使用 Rx 执行延迟采样的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45420392/

相关文章:

c# - 上传文件并发送到服务层,即 c# 类库

.net - 如何创建从 MSMQ 消息队列中读取的 IObservable<T>?

c# - 处理除集合之外的绑定(bind)事件,以及在删除C#时取消绑定(bind)的事件-多线程

angular - 仅当第一次失败时才执行第二次 http 调用

java - RxJava : Publish subject doOnSubscribe never gets called

c# - 从异步 ApiController 返回即时响应

c# - 通用 Windows 项目 - HttpClient 异常

unit-testing - 测试响应式(Reactive)扩展 - 如何将测试调度程序与 ToTask() 一起使用?

java - RxJava - 获取列表中的每一项

c# - 从 byte* 转换为 byte[]