c# - 在订阅中调用 Task.Factory.StartNew(async () => {}) 通常是可疑的吗?

标签 c# asynchronous system.reactive orleans

我有一种情况,我需要使用自定义调度程序来运行任务(这些需要是任务)并且调度程序没有设置同步上下文(所以没有 ObserveOnSubscribeOn SynchronizationContextScheduler 等。我收集)。以下是我最终的做法。现在,我想知道,我不确定这是否是进行异步调用和等待结果的最合适方式。这样可以吗,还是有更强大或更惯用的方式?

var orleansScheduler = TaskScheduler.Current;
var someObservable = ...;
someObservable.Subscribe(i =>
{
    Task.Factory.StartNew(async () =>
    {
        return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);          
});

如果不需要等待怎么办?

我找到了一个具体的简化示例来说明我正在做的事情 here .基本上我在 Orleans 使用 Rx,上面的代码是我正在做的事情的简单说明。虽然我也对一般情况感兴趣。

最终代码 事实证明,这在奥尔良的背景下有点棘手。我不知道如何才能使用 ObserveOn,这正是我想要使用的东西。问题是,通过使用它,Subscribe 将永远不会被调用。代码:

var orleansScheduler = TaskScheduler.Current;
var factory = new TaskFactory(orleansScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
var someObservable = ...;
someObservable
//.ObserveOn(rxScheduler) This doesn't look like useful since...
.SelectMany(i =>
{
    //... we need to set the custom scheduler here explicitly anyway.
    //See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/.
    //Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which
    //in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence
    //the following .Subscribe wouldn't be called. 
    return Task.Factory.StartNew(async () =>
    { 
       //In reality this is an asynchronous grain call. Doing the "shorthand way"
       //(and optionally using ObserveOn) would get the grain called, but not the
       //following .Subscribe. 
       return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
})
.Subscribe(i =>
{
    Trace.WriteLine(i);
});

另外,在 Codeplex Orleans forums 的相关线程的链接.

最佳答案

我强烈建议不要对任何现代代码使用 StartNew。它确实有一个用例,但非常罕见。

如果您必须使用自定义任务调度程序,我建议将 ObserveOnTaskPoolScheduler 结合使用,该 TaskPoolScheduler 是围绕您的调度程序构建的 TaskFactory 包装器。这是一个满口的,所以这是一般的想法:

var factory = new TaskFactory(customScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...

然后您可以使用 SelectMany 在源流中的每个事件到达时启动异步操作。

另一种不太理想的解决方案是为您的订阅“事件”使用 async void。这是可以接受的,但你必须注意你的错误处理。作为一般规则,不允许异常从 async void 方法传播出去。

还有第三种选择,您可以将一个可观察对象挂接到 TPL 数据流 block 中。像 ActionBlock 这样的 block 可以指定其任务调度程序,Dataflow 自然理解异步处理程序。请注意,默认情况下,Dataflow block 将限制一次对单个元素的处理。

关于c# - 在订阅中调用 Task.Factory.StartNew(async () => {}) 通常是可疑的吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26874576/

相关文章:

javascript - Nodejs 为什么 await 只限于异步函数?

javascript - JavaScript 中的异步原语是什么?

system.reactive - 当最后一个观察者退订时,我该如何创建一个Rx可观察的站来停止发布事件?

c# - WPF:将列表动态绑定(bind)到(某些)对象的属性

c# - Entity Framework Core : Is it safe to delete Migration. Designer.cs 如果我们永远不会恢复迁移?

Swift - 大中央调度类

c# - 从 Event 创建 Observable 时出错

c# - JObject JSON 解析嵌套对象

c# - 将 Windows 窗体单元测试迁移到 WPF

c# - 如何制作一个只能订阅一次的轻量级 `Replay`算子?