我有一种情况,我需要使用自定义调度程序来运行任务(这些需要是任务)并且调度程序没有设置同步上下文(所以没有 ObserveOn
,SubscribeOn
、SynchronizationContextScheduler
等。我收集)。以下是我最终的做法。现在,我想知道,我不确定这是否是进行异步调用和等待结果的最合适方式。这样可以吗,还是有更强大或更惯用的方式?
var orleansScheduler = TaskScheduler.Current;
var someObservable = ...;
someObservable.Subscribe(i =>
{
Task.Factory.StartNew(async () =>
{
return await AsynchronousOperation(i);
}, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);
});
如果不需要等待怎么办?
最终代码
事实证明,这在奥尔良的背景下有点棘手。我不知道如何才能使用 ObserveOn
,这正是我想要使用的东西。问题是,通过使用它,Subscribe
将永远不会被调用。代码:
var orleansScheduler = TaskScheduler.Current;
var factory = new TaskFactory(orleansScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
var someObservable = ...;
someObservable
//.ObserveOn(rxScheduler) This doesn't look like useful since...
.SelectMany(i =>
{
//... we need to set the custom scheduler here explicitly anyway.
//See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/.
//Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which
//in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence
//the following .Subscribe wouldn't be called.
return Task.Factory.StartNew(async () =>
{
//In reality this is an asynchronous grain call. Doing the "shorthand way"
//(and optionally using ObserveOn) would get the grain called, but not the
//following .Subscribe.
return await AsynchronousOperation(i);
}, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
})
.Subscribe(i =>
{
Trace.WriteLine(i);
});
另外,在 Codeplex Orleans forums 的相关线程的链接.
最佳答案
我强烈建议不要对任何现代代码使用 StartNew
。它确实有一个用例,但非常罕见。
如果您必须使用自定义任务调度程序,我建议将 ObserveOn
与 TaskPoolScheduler
结合使用,该 TaskPoolScheduler
是围绕您的调度程序构建的 TaskFactory
包装器。这是一个满口的,所以这是一般的想法:
var factory = new TaskFactory(customScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...
然后您可以使用 SelectMany
在源流中的每个事件到达时启动异步操作。
另一种不太理想的解决方案是为您的订阅“事件”使用 async void
。这是可以接受的,但你必须注意你的错误处理。作为一般规则,不允许异常从 async void 方法传播出去。
还有第三种选择,您可以将一个可观察对象挂接到 TPL 数据流 block 中。像 ActionBlock
这样的 block 可以指定其任务调度程序,Dataflow 自然理解异步处理程序。请注意,默认情况下,Dataflow block 将限制一次对单个元素的处理。
关于c# - 在订阅中调用 Task.Factory.StartNew(async () => {}) 通常是可疑的吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26874576/