c# - 任务排序和重新进入

标签 c# .net asynchronous task-parallel-library async-await

我有以下情况,我认为这可能很常见:

  1. 有一个任务(UI 命令处理程序)可以同步或异步完成。

  2. 命令到达的速度可能比处理它们的速度快。

  3. 如果某个命令已经有待处理的任务,则新的命令处理程序任务应按顺序排队和处理。

  4. 每个新任务的结果可能取决于前一个任务的结果。

取消应该被观察到,但为了简单起见,我想将它留在这个问题的范围之外。此外,线程安全(并发)不是必需的,但必须支持可重入。

这是我试图实现的基本示例(为简单起见,作为控制台应用程序):

using System;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var asyncOp = new AsyncOp<int>();

            Func<int, Task<int>> handleAsync = async (arg) =>
            {
                Console.WriteLine("this task arg: " + arg);

                //await Task.Delay(arg); // make it async

                return await Task.FromResult(arg); // sync
            };

            Console.WriteLine("Test #1...");
            asyncOp.RunAsync(() => handleAsync(1000));
            asyncOp.RunAsync(() => handleAsync(900));
            asyncOp.RunAsync(() => handleAsync(800));
            asyncOp.CurrentTask.Wait();

            Console.WriteLine("\nPress any key to continue to test #2...");
            Console.ReadLine();

            asyncOp.RunAsync(() =>
            {
                asyncOp.RunAsync(() => handleAsync(200));
                return handleAsync(100);
            });

            asyncOp.CurrentTask.Wait();
            Console.WriteLine("\nPress any key to exit...");
            Console.ReadLine();
        }

        // AsyncOp
        class AsyncOp<T>
        {
            Task<T> _pending = Task.FromResult(default(T));

            public Task<T> CurrentTask { get { return _pending; } }

            public Task<T> RunAsync(Func<Task<T>> handler)
            {
                var pending = _pending;
                Func<Task<T>> wrapper = async () =>
                {
                    // await the prev task
                    var prevResult = await pending;
                    Console.WriteLine("\nprev task result:  " + prevResult);
                    // start and await the handler
                    return await handler();
                };

                _pending = wrapper();
                return _pending;
            }
        }

    }
}

输出:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800

prev task result:  800
this task arg: 200
this task arg: 100

Press any key to exit...

It works in accordance with the requirements, until re-entrancy is introduced in test #2:

asyncOp.RunAsync(() =>
{
    asyncOp.RunAsync(() => handleAsync(200));
    return handleAsync(100);
});

期望的输出应该是100200,而不是200100,因为已经有一个100 的未决外部任务。这显然是因为内部任务是同步执行的,打破了逻辑 var pending = _pending;/* ... */_pending = wrapper() 用于外部任务。

如何让它也适用于测试#2?

一种解决方案是使用 Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext() 为每个任务强制执行异步。但是,我不想强​​制异步执行命令处理程序可能在内部是同步的。此外,我不想依赖于任何特定同步上下文的行为(即依赖于 Task.Factory.StartNew 应该在创建的任务完成之前返回实际上已经开始了)。

在实际项目中,我负责上面的 AsyncOp,但无法控制命令处理程序(即 handleAsync 中的任何内容) .

最佳答案

我差点忘了构造 Task 是可能的手动,无需启动或安排。然后,"Task.Factory.StartNew" vs "new Task(...).Start"让我回到正轨。我认为这是少数情况之一 Task<TResult> 构造函数实际上可能很有用,还有嵌套任务(Task<Task<T>>)和 Task.Unwrap() :

// AsyncOp
class AsyncOp<T>
{
    Task<T> _pending = Task.FromResult(default(T));

    public Task<T> CurrentTask { get { return _pending; } }

    public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
    {
        var pending = _pending;
        Func<Task<T>> wrapper = async () =>
        {
            // await the prev task
            var prevResult = await pending;
            Console.WriteLine("\nprev task result:  " + prevResult);
            // start and await the handler
            return await handler();
        };

        var task = new Task<Task<T>>(wrapper);
        var inner = task.Unwrap();
        _pending = inner;

        task.RunSynchronously(useSynchronizationContext ?
            TaskScheduler.FromCurrentSynchronizationContext() :
            TaskScheduler.Current);

        return inner;
    }
}

输出:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800
this task arg: 100

prev task result:  100
this task arg: 200

现在也很容易制作AsyncOp通过添加 lock 实现线程安全保护_pending ,如果需要的话。

已更新cancel/restart logic 进一步改进了这一点.

关于c# - 任务排序和重新进入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21424084/

相关文章:

.net - VS 2010 Performance Profiler : Identifying Threads

c# - Pdf sharp 字体样式 Bold,Italic and Underline together

javascript - javascript的执行流程(同步或异步)

c# - 如何在 IIS 中正确托管连接到 SQLServer 的 WCF 数据服务?为什么我会收到错误?

c# - 这怎么会发生?

c# - 如何在C#中设置组合框的选定值

.net - 网站编译在 MSBuild 中失败,但在 Visual Studio 中有效

javascript - 在初始化路由之前发送 $http 请求?

Java 多线程与 CompletableFuture 运行速度较慢

c# - 不能修改 Dictionary 的返回值,因为它不是变量