c# - 如何在异步场景中测试 Rx observable?

标签 c# asynchronous system.reactive reactive-programming observable

我已将问题分解为一个示例。 处理器执行异步操作并通过 Rx observable 发布结果。

public class Tests
{
    [Test]
    public async Task Receiving_an_async_result_should_succeed()
    {
        var p = new Processor();

        p.Process("message");

        var res = await p.Results;

        res.Should().NotBeNullOrEmpty();
    }
}

public class Processor
{
    private readonly ReplaySubject<string> _subject = new ReplaySubject<string>();

    public async void Process(string message)
    {
        try
        {
            // Fake some async processing here!
            var r = await Task.Factory.StartNew(() => message.Reverse().ToString());

            _subject.OnNext(r);
        }
        catch (Exception e)
        {
            _subject.OnError(e);
        }
    }

    public IObservable<string> Results { get { return _subject; } }
}

这里的问题是我不知道如何正确设置测试用例。看来期待的结果永远不会到来!

最佳答案

结果是IObservable<T> - 因此,直到发送 OnCompleted() 后,它才完成。 。这就是为什么你的等待没有返回。

我怀疑您只想等待第一个结果(看起来您打算重复调用 Process) - 所以如果您更改此行:

var res = await p.Results;

对此:

var res = await p.Results.Take(1);

你将会有更多的运气。

关于c# - 如何在异步场景中测试 Rx observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20348549/

相关文章:

c# - 将 C# 二维数组格式化为 Javascript

c# - await Task.Delay() 不适用于 Xamarin.iOS 使用 MVVMCross

javascript - RxJs:拖放示例:添加 mousedragstart

c# - Reactive 的 "Buffer until quiet"行为?

events - F# 响应式(Reactive)订阅来自 COM 的非标准事件

C# InvalidArgument= '1' 的值对于 'index' 无效

c# - 如何将 Unicode 字符写入控制台?

c# - XmlSerializer 没有序列化两个枚举属性

java - 使用异步 servlet 时 Thread.Sleep() 的替代方案?

javascript - node.js async.series 是它应该如何工作的?