我有两个 .net Task 对象,我可能希望它们并行或顺序运行。无论哪种情况,我都不想阻塞线程来等待它们。事实证明,Reactive Extensions让平行故事变得简单美好。但是当我尝试按顺序安排任务时,代码可以工作但感觉很尴尬。
我想知道是否有人可以展示如何使顺序版本更简洁或像并行版本一样轻松编码。没有必要使用响应式扩展来回答这个问题。
作为引用,这是我的并行和顺序处理的两个解决方案。
并行处理版本
这是纯粹的快乐:
public Task<string> DoWorkInParallel()
{
var result = new TaskCompletionSource<string>();
Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
Task<bool> BravoTask = Task.Factory.StartNew(() => true);
//Prepare for Rx, and set filters to allow 'Zip' to terminate early
//in some cases.
IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);
Observable
.Zip(
AsyncAlpha,
AsyncBravo,
(x, y) => y.ToString() + x.ToString())
.Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
(x) => { result.TrySetResult(x); },
(x) => { result.TrySetException(x.GetBaseException()); },
() => { result.TrySetResult("Nothing"); });
return result.Task;
}
顺序/流水线处理版本
这可行但很笨拙:
public Task<string> DoWorkInSequence()
{
var result = new TaskCompletionSource<string>();
Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
AlphaTask.ContinueWith(x =>
{
if (x.IsFaulted)
{
result.TrySetException(x.Exception.GetBaseException());
}
else
{
if (x.Result != 5)
{
Task<bool> BravoTask = Task.Factory.StartNew(() => true);
BravoTask.ContinueWith(y =>
{
if (y.IsFaulted)
{
result.TrySetException(y.Exception.GetBaseException());
}
else
{
if (y.Result)
{
result.TrySetResult(x.Result.ToString() + y.Result.ToString());
}
else
{
result.TrySetResult("Nothing");
}
}
});
}
else
{
result.TrySetResult("Nothing");
}
}
}
);
return result.Task;
}
在上面的顺序代码中,它变得一团糟,我什至没有添加timeout capability。匹配水货版!
要求(8 月 6 日更新)
回答者请注意:
顺序方案应允许第一个任务的输出提供第二个任务的输入的安排。我上面的示例“笨拙”代码很容易被安排来实现这一目标。
我对 .net 4.5 的答案感兴趣 - 但 .net 4.0 的答案对我来说同样重要或更重要。
任务“Alpha”和“Bravo”的总时限为 200 毫秒;他们每个人都没有 200 毫秒。在顺序情况下也是如此。
如果任一任务返回无效结果,则 SourceCompletionTask 必须在两个任务完成之前提前完成。如示例代码中的显式测试所示,无效结果为 [AlphaTask:5] 或 [BravoTask:false]。
8 月 8 日更新:澄清 - 在顺序情况下,如果 AlphaTask 不成功或超时已经发生,则 BravoTask 根本不应执行。假设 AlphaTask 和 BravoTask 都不能阻塞。这并不重要,但在我的真实场景中,它们实际上是异步 WCF 服务调用。
也许我可以利用 Rx 的一个方面来清理顺序版本。但即使只是任务编程本身也应该有一个我想象的更好的故事。我们拭目以待。
勘误 在这两个代码示例中,我将返回类型更改为任务,因为张贴者的回答非常正确,我不应该返回 TaskCompletionSource。
最佳答案
如果你可以使用 async/await,Brandon 有一个很好的答案。如果您仍在使用 VS2010,我要清理顺序版本的第一件事就是获取一个扩展方法,例如 blog post 中描述的 Then
方法 Stephen Toub。如果您不使用 .NET 4.5,我还会实现 Task.FromResult
方法。有了这些,您可以获得:
public Task<string> DoWorkInSequence()
{
return Task.FromResult(4)
.Then(x =>
{ if (x != 5)
{
return Task.FromResult(true)
.Then(y =>
{ if (y)
{
return Task.FromResult(x.ToString() + y.ToString());
}
else
{
return Task.FromResult("Nothing");
}
});
}
else
{
return Task.FromResult("Nothing");
}
});
}
此外,您通常应该返回一个 Task 而不是 TaskCompletionSource(您可以通过调用 TaskCompletionSource 上的 .Task
获得),因为您不希望调用者在您要返回给他们的任务。
Brandon 的回答还提供了实现超时功能的好方法(针对缺少 async/await 关键字进行调整)。
编辑 为了减少箭头代码,我们可以实现更多的 LINQ 方法。先前链接的博客文章中提供了 SelectMany 实现。 LINQ 需要的其他方法是 Select 和 Where。一旦你完成了 Then 和 SelectMany,这些应该相当简单,但它们在这里:
public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate)
{
if (task == null) throw new ArgumentNullException("task");
if (predicate == null) throw new ArgumentNullException("predicate");
var tcs = new TaskCompletionSource<T>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetCanceled();
else
{
try
{
if (predicate(completed.Result))
tcs.TrySetResult(completed.Result);
else
tcs.TrySetCanceled();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
});
return tcs.Task;
}
public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector)
{
if (task == null) throw new ArgumentNullException("task");
if (selector == null) throw new ArgumentNullException("selector");
var tcs = new TaskCompletionSource<TResult>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetCanceled();
else
{
try
{
tcs.TrySetResult(selector(completed.Result));
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
});
return tcs.Task;
}
在那之后,最后一个非 LINQ 扩展方法允许使用在取消时返回默认值:
public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue)
{
if (task == null) throw new ArgumentNullException("task");
var tcs = new TaskCompletionSource<T>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetResult(defaultValue);
else tcs.TrySetResult(completed.Result);
});
return tcs.Task;
}
以及新的和改进的 DoWork(无超时):
public static Task<string> DoWorkInSequence()
{
return (from x in Task_FromResult(5)
where x != 5
from y in Task_FromResult(true)
where y
select x.ToString() + y.ToString()
).IfCanceled("Nothing");
}
Brandon 的回答中的 Timeout 方法(一旦重写,如果不需要异步/等待)可以停留在链的末端以获得整体超时和/或如果你想在链中的每个步骤之后保持进一步的步骤一旦达到整体超时就运行。链中断的另一种可能性是让所有单独的步骤采用取消 token 并修改 Timeout 方法以采用 CancellationTokenSource 并在发生超时时取消它,以及抛出超时异常。
编辑(布伦特阿里亚斯)
根据您提出的绝妙想法,我设计了我认为是我的 POV 的最终答案。它基于 ParallelExtensionsExtras 的 nuget 包中的 .net 4.0 扩展方法。下面的示例添加了第三个任务,以帮助说明根据我规定的要求对顺序任务进行编程的“感觉”:
public Task<string> DoWorkInSequence()
{
var cts = new CancellationTokenSource();
Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); });
Task<int> AlphaTask = Task.Factory
.StartNew(() => 4 )
.Where(x => x != 5 && !cts.IsCancellationRequested);
Task<bool> BravoTask = AlphaTask
.Then(x => true)
.Where(x => x && !cts.IsCancellationRequested);
Task<int> DeltaTask = BravoTask
.Then(x => 7)
.Where(x => x != 8);
Task<string> final = Task.Factory
.WhenAny(DeltaTask, timer)
.ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion
? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing");
//This is here just for experimentation. Placing it at different points
//above will have varying effects on what tasks were cancelled at a given point in time.
cts.Cancel();
return final;
}
我在这次讨论和共同努力中得出了一些关键的观察结果:
- 在琐碎的情况下使用“Then”扩展很好,但值得注意的是适用性有限。对于更复杂的情况,有必要将其替换为例如
.ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default)
。在我陈述的场景中将“Then”替换为“ContinueWith”时,添加OnlyOnRanToCompletion
选项至关重要。 - 在我的情况下,使用超时扩展最终是行不通的。这是因为它只会导致取消它立即附加到的任务,而不是取消序列中的所有先行任务实例。这就是我改用
StartNewDelayed(...)
策略并在每个Where
子句中添加快速取消检查的原因。 - 虽然 ParallelExtensionsExtras 库定义了您所使用的 LINQ to Tasks,但我认为最好远离 Tasks 的 LINQ-ish 外观。这是因为 LINQ 的任务是 highly esoteric ;它可能会使普通开发人员感到困惑。让他们理解异步编码已经够难的了。甚至 LINQ to Tasks 的作者也说“How useful this LINQ implementation is in practice is arguable,但至少它提供了一个有趣的思维练习。”是的,同意,一个有趣的思维练习。当然,我必须至少承认“Where”LINQ to Tasks 方法,因为它在我上面列出的解决方案中发挥了关键作用。
关于c# - 如何使顺序处理像并行处理一样简单,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18060646/