我有一个封装了 Observable.Sample()
的类,例如:
class IntervalRequestScheduler
{
private Subject<Action> _requests = new Subject<Action>();
private IDisposable _observable;
public IntervalRequestScheduler(TimeSpan requestLimit)
{
_observable = _requests.Sample(requestLimit)
.Subscribe(action => action());
}
public Task<T> ScheduleRequest<T>(Func<Task<T>> request)
{
var tcs = new TaskCompletionSource<T>();
_requests.OnNext(async () =>
{
try
{
T result = await request();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
}
如何正确测试它?我所有的尝试要么过早退出,要么导致死锁。
最佳答案
在 Rx 中控制时间
单元测试 Rx 的关键是了解如何使用 TestScheduler
控制时间. Rx 库中所有基于时间的运算符都采用可选的 IScheduler
。参数以便让您执行此操作。您的基于时间的运算符也应该这样做。
所以我们要做的第一件事就是修改你的IntervalRequestScheduler
构造函数来促进这一点:
public IntervalRequestScheduler(TimeSpan requestLimit,
// The scheduler is optional
IScheduler scheduler = null)
{
// assign a default if necessary
scheduler = scheduler ?? Scheduler.Default;
// make sure to pass the scheduler in to `Sample`
_observable = _requests.Sample(requestLimit, scheduler)
.Subscribe(action => action());
}
有了这个改变,我们现在可以控制时间了!
这是一个示例单元测试,它将调用 IntervalRequestScheduler
实例的 ScheduleRequest
方法十次 - 然后将时间提前一秒的样本持续时间并检查是否只有一个任务已完成:
[Test]
public void ASingleTaskIsCompletedWhenTenAreScheduledWithinInterval()
{
var scheduler = new TestScheduler();
var sampleDuration = TimeSpan.FromSeconds(1);
var intervalRequestScheduler = new IntervalRequestScheduler(sampleDuration,
scheduler);
// use a helper method to create "requests"
var taskFactories = Enumerable.Range(0, 10).Select(CreateRequest);
// schedule the requests and collect the tasks into an array
var tasks =
(from tf in taskFactories
select intervalRequestScheduler.ScheduleRequest(tf)).ToArray();
// prove no tasks have completed
var completedTasksCount = tasks.Count(t => t.IsCompleted);
Assert.AreEqual(0, completedTasksCount);
// this is the key - we advance time simulating a sampling period.
scheduler.AdvanceBy(sampleDuration.Ticks);
// now we see exactly one task has completed
completedTasksCount = tasks.Count(t => t.IsCompleted);
Assert.AreEqual(1, completedTasksCount);
}
// helper to create requests
public Func<Task<int>> CreateRequest(int result)
{
return () => Task.Run(() => result);
}
一边
到目前为止,我只是专注于手头的问题 - 但我确实想补充一点 IntervalRequestScheduler
的实际动机有点不清楚,代码看起来有点乱。在不混合包装任务和 IObservables 的情况下,可能有更好的方法来实现这一点。留在 Rx 世界中还可以更轻松地通过控制所涉及的调度程序来使测试可预测。在上面的代码中,我忽略了一些肮脏的地方,因为任务调用是异步的,并且一个启动的任务可能在您测试它时实际上尚未完成 - 所以为了绝对正确,您需要进入监控任务并为它们的开始和完成留出时间的杂乱事务。但希望您能看到 TestScheduler 避免了 Rx 端的所有这些困惑。
如果您想将运行的作业数量限制在特定速率,为什么不对输入进行采样并投影输出呢?
例如 - 假设您提交了一个类型为 Func<int,int>
的请求函数称为 runRequest
和一个 IObservable<int> requests
输入流提供每个请求的输入(例如可以是 Subject<int>
)。那么你可以:
requests.Sample(TimeSpan.FromSeconds(1), scheduler)
.Select(input => request(input))
.Subscribe(result => /* DoSomethingWithResult */);
当然不知道这是否适用于您的场景,但它可能会激发一些想法!
关于c# - 如何对 Observable.Sample() 进行单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26189336/