c# - 等待一个可观察的

标签 c# system.reactive async-await .net-4.5 c#-5.0

因此,在 C# 4.0 的悲惨日子里,我创建了以下“WorkflowExecutor”类,它通过入侵 IEnumerable 的“yield return”continuations 来等待 observables,从而允许 GUI 线程中的异步工作流。因此,下面的代码将在 button1Click 处启动一个简单的工作流来更新文本,等待您单击 button2,并在 1 秒后循环。

public sealed partial class Form1 : Form {
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();

    public Form1() {
        InitializeComponent();
    }

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() {
        Text = "Initializing";
        var scheduler = new ControlScheduler(this);
        while (true) {
            yield return scheduler.WaitTimer(1000);
            Text = "Waiting for Click";
            yield return _button2Subject;
            Text = "Click Detected!";
            yield return scheduler.WaitTimer(1000);
            Text = "Restarting";
        }
    }

    void button1_Click(object sender, EventArgs e) {
        _workflowExecutor.Run(CreateAsyncHandler());
    }

    void button2_Click(object sender, EventArgs e) {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e) {
        _workflowExecutor.Stop();
    }
}

public static class TimerHelper {
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) {
        return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default);
    }
}

public sealed class WorkflowExecutor {
    IEnumerator<IObservable<Unit>> _observables;
    IDisposable _subscription;

    public void Run(IEnumerable<IObservable<Unit>> actions) {
        _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator();
        Continue();
    }

    void Continue() {
        if (_subscription != null) {
            _subscription.Dispose();
        }
        if (_observables.MoveNext()) {
            _subscription = _observables.Current.Subscribe(_ => Continue());
        }
    }

    public void Stop() {
        Run(null);
    }
}

这个想法的聪明部分,使用“yield”延续来完成异步工作,取自 Daniel Earwicker 的 AsyncIOPipe 想法:http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/ ,然后我在其上添加了响应式框架。

现在我在使用 C# 5.0 中的异步功能重写它时遇到了麻烦,但看起来这应该是一件很简单的事情。当我将可观察对象转换为任务时,它们只运行一次,而 while 循环第二次崩溃。任何解决问题的帮助都会很棒。

综上所述,async/await 机制为我提供了哪些 WorkflowExecutor 没有提供的功能?有什么我可以用 async/await 做但我不能用 WorkflowExecutor 做的(给定类似数量的代码)吗?

最佳答案

正如 James 提到的,您可以从 Rx v2.0 Beta 开始等待 IObservable 序列。行为是返回最后一个元素(在 OnCompleted 之前),或者抛出观察到的 OnError。如果序列不包含任何元素,您将得到一个 InvalidOperationException。

注意使用这个,你可以获得所有其他想要的行为:

  • 通过等待 xs.FirstAsync() 获取第一个元素
  • 通过等待 xs.SingleAsync() 确保只有一个值
  • 当您对空序列没问题时,等待 xs.DefaultIfEmpty()
  • 要获取所有元素,请等待 xs.ToArray() 或等待 xs.ToList()

您可以做更奇特的事情,例如计算聚合结果,但使用 Do 和 Scan 观察中间值:

var xs = Observable.Range(0, 10, Scheduler.Default);

var res = xs.Scan((x, y) => x + y)
            .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); });

Console.WriteLine("Done! The sum is {0}", await res);

关于c# - 等待一个可观察的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10290156/

相关文章:

c# - 如何在 .NET Core ConsoleApp 中实例化 ServiceCollection 并使用 IHttpClientFactory?

c# - IObservable .Cast<>() 和协方差

silverlight - 如何在 Silverlight Web 客户端上执行 Rx Observable

f# - MailboxProcessor 只是复制 IObservable 吗?

c# - 等待任务的行为不符合我的预期

c# - 如何在C#中成功下载一个.exe文件?

c# - 从 .Net 4.6.1 单元测试引用 .Net 标准项目时缺少方法异常

C# ASP.NET Azure 移动服务 InvalidOperationException

c# - 将枚举转换为人类可读的值

c# - 如何聚合来自异步生产者的数据并将其写入文件?