c# - 如何使用Observables实现轮询?

标签 c# async-await system.reactive reactive-programming rx-java

我有一个参数化的rest调用,应该使用不同的参数每五秒钟执行一次:

Observable<TResult> restCall = api.method1(param1);


我需要创建一个Observable<TResult>,它将每隔5秒以不同的param1值轮询restCall。如果api调用失败,我需要得到一个错误,并在5秒钟内进行下一个调用。仅当restCall完成(成功/错误)时,才应测量两次呼叫之间的间隔。

我目前正在使用RxJava,但是.NET示例也不错。

最佳答案

介绍

首先,我是一个.NET专家,我知道这种方法使用的一些习语在Java中没有直接等效项。但是,我谨遵从您的话,并根据这是.NET专家们会喜欢的一个很好的问题,继续进行下去,并希望它将引导您沿着rx-java的正确道路前进,这是我从未研究过的。这是一个很长的答案,但主要是解释-解决方案代码本身很短!

两者之一的使用

我们将需要首先整理一些工具以帮助该解决方案。首先是Either<TLeft, TRight>类型的使用。这很重要,因为每个调用都有两个可能的结果,要么是好结果,要么是错误。但是我们需要将它们包装为单一类型-我们不能使用OnError向后发送错误,因为这会终止结果流。两者看上去都像元组,因此更容易处理这种情况。 Rxx library具有Either的非常完整和良好的实现,但这是一个简单的用法通用示例,然后是一个对我们有用的简单实现:

var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());

/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
{
    public abstract bool IsLeft { get; }
    public bool IsRight { get { return !IsLeft; } }
    public abstract TLeft Left { get; }
    public abstract TRight Right { get;  }    
}

public static class Either
{
    public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TLeft _leftValue;

        public LeftValue(TLeft leftValue)
        {
            _leftValue = leftValue;
        }

        public override TLeft Left { get { return _leftValue; } }
        public override TRight Right { get { return default(TRight); } }
        public override bool IsLeft { get { return true; } }
    }

    public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TRight _rightValue;

        public RightValue(TRight rightValue)
        {
            _rightValue = rightValue;
        }

        public override TLeft Left { get { return default(TLeft); } }
        public override TRight Right { get { return _rightValue; } }
        public override bool IsLeft { get { return false; } }
    }

    // Factory functions to create left or right-valued Either instances
    public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
    {
        return new LeftValue<TLeft, TRight>(leftValue);
    }

    public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
    {
        return new RightValue<TLeft, TRight>(rightValue);
    }
}


请注意,按照惯例,在使用Either建模成功或失败时,将Right用作成功值,因为它当然是“ Right” :)

一些辅助功能

我将使用一些辅助函数来模拟您的问题的两个方面。首先,这是一个生成参数的工厂-每次调用它时,它将返回以1开头的整数序列中的下一个整数:

// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
{
    return ++count; 
}


接下来,这是一个函数,将您的Rest调用模拟为IObservable。该函数接受一个整数,并且:


如果整数是偶数,则返回一个Observable,该Observable立即发送一个OnError。
如果整数为奇数,则返回字符串,该字符串将整数与“ -ret”连接,但仅在经过一秒钟之后。我们将使用它来检查轮询间隔是否符合您的要求-作为完成的调用之间的暂停(无论它们花费多长时间),而不是规则的间隔。


这里是:

// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
{
    return x % 2 == 0
        ? Observable.Throw<string>(new Exception())
        : Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));   
}


现在进行中

以下是我称为Poll的合理通用重用函数。它接受将要轮询的异步函数,该函数的参数工厂,所需的休息时间(无双关!),最后是要使用的IScheduler。

我想出的最简单的方法是使用Observable.Create,它使用调度程序来驱动结果流。 ScheduleAsync是一种使用.NET async / await形式进行计划的方法。这是一个.NET惯用法,使您可以以命令式方式编写异步代码。 async关键字引入了一个异步函数,该函数然后可以在其主体中await一个或多个异步调用,并且仅在调用完成时才继续。 I wrote a long explanation of this style of scheduling in this question, which includes the older recursive the style that might be easier to implement in an rx-java approach.代码如下所示:

public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
    Func<TArg, IObservable<TResult>> asyncFunction,
    Func<TArg> parameterFactory,
    TimeSpan interval,
    IScheduler scheduler)
{
    return Observable.Create<Either<Exception, TResult>>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) => {
            while(!ct.IsCancellationRequested)
            {
                try
                {
                    var result = await asyncFunction(parameterFactory());
                    observer.OnNext(Either.Right<Exception,TResult>(result));
                }
                catch(Exception ex)
                {
                    observer.OnNext(Either.Left<Exception, TResult>(ex));
                }
                await ctrl.Sleep(interval, ct);
            }
        });        
    });    
}


对此进行分解,通常Observable.Create是用于创建IObservable的工厂,它使您可以更好地控制将结果发布到观察者的方式。它经常被忽略,而倾向于不必要的原始图元组成。

在这种情况下,我们将使用它来创建Either<TResult, Exception>流,以便我们可以返回成功和失败的轮询结果。

Create函数接受一个观察者,该观察者表示我们通过OnNext / OnError / OnCompleted将结果传递给的订阅者。我们需要在IDisposable调用内返回一个Create-在.NET中,这是订阅服务器可以通过其取消订阅的句柄。这在这里特别重要,因为否则轮询将永远进行下去-或至少它将永远不会OnComplete

ScheduleAsync(或普通Schedule)的结果就是这样的句柄。处置后,它将取消我们计划的任何未决事件-从而终止轮询循环。在本例中,用于管理间隔的Sleep是可取消操作,尽管可以轻松地修改Poll函数以接受也接受asyncFunction的可取消CancellationToken

ScheduleAsync方法接受一个将调度事件的函数。它传递了两个参数,第一个ctrl是调度程序本身。第二个ct是CancellationToken,我们可以使用它来查看是否已请求取消(由订阅者布置其订阅句柄)。

轮询本身是通过无限while循环执行的,该循环仅在CancellationToken指示已请求取消时才终止。

在循环中,我们可以使用async / await的魔力来异步调用轮询功能,但仍将其包装在异常处理程序中。太棒了!假设没有错误,我们将结果作为Either的正确值通过OnNext发送给观察者。如果有异常,我们将其作为Either的左值发送给观察者。最后,我们在调度程序上使用Sleep函数在休息间隔之后安排一次唤醒调用-不要与Thread.Sleep调用混淆,该调用通常不会阻塞任何线程。请注意,Sleep接受CancellationToken使其也被中止!

我想您会同意,这是async / await的一个很酷的用法,它可以简化本来非常棘手的问题!

用法示例

最后,这是一些调用Poll的测试代码以及​​示例输出-对于LINQPad爱好者,此答案中的所有代码将一起在LINQPad中运行,并引用Rx 2.1程序集:

void Main()
{
    var subscription = Poll(SomeRestCall,
                            ParameterFactory,
                            TimeSpan.FromSeconds(5),
                            ThreadPoolScheduler.Instance)
        .TimeInterval()                            
        .Subscribe(x => {
            Console.Write("Interval: " + x.Interval);
            var result = x.Value;
            if(result.IsRight)
                Console.WriteLine(" Success: " + result.Right);
            else
                Console.WriteLine(" Error: " + result.Left.Message);
        });

    Console.ReadLine();    
    subscription.Dispose();
}

Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.


请注意,如果立即返回错误,则结果之间的间隔为5秒(轮询间隔),或者为成功结果则为6秒(轮询间隔加上模拟的REST调用持续时间)。

编辑-这是一个不使用ScheduleAsync的替代实现,但使用旧式递归调度并且不使用async / await语法。如您所见,这比较麻烦-但它也支持取消asyncFunction可观察到的状态。

    public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
        Func<TArg, IObservable<TResult>> asyncFunction,
        Func<TArg> parameterFactory,
        TimeSpan interval,
        IScheduler scheduler)
    {
        return Observable.Create<Either<Exception, TResult>>(
            observer =>
                {
                    var disposable = new CompositeDisposable();
                    var funcDisposable = new SerialDisposable();
                    bool cancelRequested = false;
                    disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
                    disposable.Add(funcDisposable);
                    disposable.Add(scheduler.Schedule(interval, self =>
                        {
                            funcDisposable.Disposable = asyncFunction(parameterFactory())
                                .Finally(() =>
                                    {
                                        if (!cancelRequested) self(interval);
                                    })
                                .Subscribe(
                                    res => observer.OnNext(Either.Right<Exception, TResult>(res)),
                                    ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));
                        }));

                    return disposable;

                });
    }


请参阅我的其他答案,以获取另一种避免.NET 4.5异步/等待功能且不使用计划调用的方法。

我确实希望这对rx-java家伙有所帮助!

关于c# - 如何使用Observables实现轮询?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19547880/

相关文章:

c# - CopyToAsync 创建零长度文件(因为应用程序正在关闭)

c# - 异步显示对话框

ios - swift 中的ConfigureAwait 方法的模拟

c# - Entity Framework : Dependencies due to foreign key constraints

C# XML 常量

c# - 自定义 id 字段时 C# 中的 MongoDB 反序列化

c# - 如何将 TestScheduler 与 ReplaySubject 时间窗一起使用

c# - 如何在泛型方法调用中使用类型变量 (C#)

c# - 响应式扩展 .MaxBy

c# - 将 IObservable<IEnumerable<T>> 转换为 IEnumerable<IObservable<T>>