我有以下情况,我认为这可能很常见:
有一个任务(UI 命令处理程序)可以同步或异步完成。
命令到达的速度可能比处理它们的速度快。
如果某个命令已经有待处理的任务,则新的命令处理程序任务应按顺序排队和处理。
每个新任务的结果可能取决于前一个任务的结果。
取消应该被观察到,但为了简单起见,我想将它留在这个问题的范围之外。此外,线程安全(并发)不是必需的,但必须支持可重入。
这是我试图实现的基本示例(为简单起见,作为控制台应用程序):
using System;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var asyncOp = new AsyncOp<int>();
Func<int, Task<int>> handleAsync = async (arg) =>
{
Console.WriteLine("this task arg: " + arg);
//await Task.Delay(arg); // make it async
return await Task.FromResult(arg); // sync
};
Console.WriteLine("Test #1...");
asyncOp.RunAsync(() => handleAsync(1000));
asyncOp.RunAsync(() => handleAsync(900));
asyncOp.RunAsync(() => handleAsync(800));
asyncOp.CurrentTask.Wait();
Console.WriteLine("\nPress any key to continue to test #2...");
Console.ReadLine();
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
asyncOp.CurrentTask.Wait();
Console.WriteLine("\nPress any key to exit...");
Console.ReadLine();
}
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
_pending = wrapper();
return _pending;
}
}
}
}
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 prev task result: 800 this task arg: 200 this task arg: 100 Press any key to exit...
It works in accordance with the requirements, until re-entrancy is introduced in test #2:
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
期望的输出应该是100
、200
,而不是200
、100
,因为已经有一个100
的未决外部任务。这显然是因为内部任务是同步执行的,打破了逻辑 var pending = _pending;/* ... */_pending = wrapper()
用于外部任务。
如何让它也适用于测试#2?
一种解决方案是使用 Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()
为每个任务强制执行异步。但是,我不想强制异步执行命令处理程序可能在内部是同步的。此外,我不想依赖于任何特定同步上下文的行为(即依赖于 Task.Factory.StartNew
应该在创建的任务完成之前返回实际上已经开始了)。
在实际项目中,我负责上面的 AsyncOp
,但无法控制命令处理程序(即 handleAsync
中的任何内容) .
最佳答案
我差点忘了构造 Task
是可能的手动,无需启动或安排。然后,"Task.Factory.StartNew" vs "new Task(...).Start"让我回到正轨。我认为这是少数情况之一 Task<TResult>
构造函数实际上可能很有用,还有嵌套任务(Task<Task<T>>
)和 Task.Unwrap()
:
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
var task = new Task<Task<T>>(wrapper);
var inner = task.Unwrap();
_pending = inner;
task.RunSynchronously(useSynchronizationContext ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);
return inner;
}
}
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 this task arg: 100 prev task result: 100 this task arg: 200
现在也很容易制作AsyncOp
通过添加 lock
实现线程安全保护_pending
,如果需要的话。
已更新,cancel/restart logic 进一步改进了这一点.
关于c# - 任务排序和重新进入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21424084/