c# - Async/await 作为协程的替代品

标签 c# asynchronous async-await coroutine

我使用 C# 迭代器替代协程,它一直运行良好。我想切换到 async/await,因为我认为它的语法更清晰并且它给了我类型安全性。 In this (outdated) blog post, Jon Skeet shows a possible way to implement it .

我选择了一种稍微不同的方式(通过实现我自己的 SynchronizationContext 并使用 Task.Yield)。这很好用。

然后我意识到会有问题;目前协程不必完成运行。它可以在任何让步的地方优雅地停止。我们可能有这样的代码:

private IEnumerator Sleep(int milliseconds)
{
    Stopwatch timer = Stopwatch.StartNew();
    do
    {
        yield return null;
    }
    while (timer.ElapsedMilliseconds < milliseconds);
}

private IEnumerator CoroutineMain()
{
    try
    {
        // Do something that runs over several frames
        yield return Coroutine.Sleep(5000);
    }
    finally
    {
        Log("Coroutine finished, either after 5 seconds, or because it was stopped");
    }
}

协程通过跟踪堆栈中的所有枚举器来工作。 C# 编译器生成一个 Dispose 函数,可以调用该函数以确保在 CoroutineMain 中正确调用“finally” block ,即使枚举未完成也是如此。通过这种方式,我们可以优雅地停止协程,并仍然确保调用 finally block ,方法是对堆栈中的所有 IEnumerator 对象调用 Dispose。这基本上是手动展开。

当我用 async/await 编写我的实现时,我意识到我们会失去这个特性,除非我弄错了。然后我查找了其他协程解决方案,但 Jon Skeet 的版本似乎也没有以任何方式处理它。

我能想到的处理这个问题的唯一方法是拥有我们自己的自定义“Yield”函数,它会检查协程是否已停止,然后引发一个异常来指示这一点。这会向上传播,执行 finally block ,然后在根附近的某个地方被捕获。不过我不觉得这很漂亮,因为第 3 方代码可能会捕获异常。

我是不是误会了什么,这有可能以更简单的方式进行吗?或者我是否需要采用异常(exception)方式来执行此操作?

编辑:已请求更多信息/代码,所以这里有一些。我可以保证这将仅在单个线程上运行,因此这里不涉及线程。 我们当前的协程实现看起来有点像这样(这是简化的,但它适用于这种简单的情况):

public sealed class Coroutine : IDisposable
{
    private class RoutineState
    {
        public RoutineState(IEnumerator enumerator)
        {
            Enumerator = enumerator;
        }

        public IEnumerator Enumerator { get; private set; }
    }

    private readonly Stack<RoutineState> _enumStack = new Stack<RoutineState>();

    public Coroutine(IEnumerator enumerator)
    {
        _enumStack.Push(new RoutineState(enumerator));
    }

    public bool IsDisposed { get; private set; }

    public void Dispose()
    {
        if (IsDisposed)
            return;

        while (_enumStack.Count > 0)
        {
            DisposeEnumerator(_enumStack.Pop().Enumerator);
        }

        IsDisposed = true;
    }

    public bool Resume()
    {
        while (true)
        {
            RoutineState top = _enumStack.Peek();
            bool movedNext;

            try
            {
                movedNext = top.Enumerator.MoveNext();
            }
            catch (Exception ex)
            {
                // Handle exception thrown by coroutine
                throw;
            }

            if (!movedNext)
            {
                // We finished this (sub-)routine, so remove it from the stack
                _enumStack.Pop();

                // Clean up..
                DisposeEnumerator(top.Enumerator);


                if (_enumStack.Count <= 0)
                {
                    // This was the outer routine, so coroutine is finished.
                    return false;
                }

                // Go back and execute the parent.
                continue;
            }

            // We executed a step in this coroutine. Check if a subroutine is supposed to run..
            object value = top.Enumerator.Current;
            IEnumerator newEnum = value as IEnumerator;
            if (newEnum != null)
            {
                // Our current enumerator yielded a new enumerator, which is a subroutine.
                // Push our new subroutine and run the first iteration immediately
                RoutineState newState = new RoutineState(newEnum);
                _enumStack.Push(newState);

                continue;
            }

            // An actual result was yielded, so we've completed an iteration/step.
            return true;
        }
    }

    private static void DisposeEnumerator(IEnumerator enumerator)
    {
        IDisposable disposable = enumerator as IDisposable;
        if (disposable != null)
            disposable.Dispose();
    }
}

假设我们有如下代码:

private IEnumerator MoveToPlayer()
{
  try
  {
    while (!AtPlayer())
    {
      yield return Sleep(500); // Move towards player twice every second
      CalculatePosition();
    }
  }
  finally
  {
    Log("MoveTo Finally");
  }
}

private IEnumerator OrbLogic()
{
  try
  {
    yield return MoveToPlayer();
    yield return MakeExplosion();
  }
  finally
  {
    Log("OrbLogic Finally");
  }
}

这将通过将 OrbLogic 枚举器的实例传递给协程,然后运行它来创建。这允许我们在每一帧都勾选协程。 如果玩家杀死球体,协程不会结束运行; Dispose 只是在协程上调用。如果 MoveTo 在逻辑上位于“try” block 中,那么在顶部 IEnumerator 上调用 Dispose 将在语义上使 finally block 位于 MoveTo 执行。然后 OrbLogic 中的 finally block 将执行。 请注意,这是一个简单的案例,案例要复杂得多。

我正在努力在异步/等待版本中实现类似的行为。此版本的代码如下所示(省略了错误检查):

public class Coroutine
{
    private readonly CoroutineSynchronizationContext _syncContext = new CoroutineSynchronizationContext();

    public Coroutine(Action action)
    {
        if (action == null)
            throw new ArgumentNullException("action");

        _syncContext.Next = new CoroutineSynchronizationContext.Continuation(state => action(), null);
    }

    public bool IsFinished { get { return !_syncContext.Next.HasValue; } }

    public void Tick()
    {
        if (IsFinished)
            throw new InvalidOperationException("Cannot resume Coroutine that has finished");

        SynchronizationContext curContext = SynchronizationContext.Current;
        try
        {
            SynchronizationContext.SetSynchronizationContext(_syncContext);

            // Next is guaranteed to have value because of the IsFinished check
            Debug.Assert(_syncContext.Next.HasValue);

            // Invoke next continuation
            var next = _syncContext.Next.Value;
            _syncContext.Next = null;

            next.Invoke();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(curContext);
        }
    }
}

public class CoroutineSynchronizationContext : SynchronizationContext
{
    internal struct Continuation
    {
        public Continuation(SendOrPostCallback callback, object state)
        {
            Callback = callback;
            State = state;
        }

        public SendOrPostCallback Callback;
        public object State;

        public void Invoke()
        {
            Callback(State);
        }
    }

    internal Continuation? Next { get; set; }

    public override void Post(SendOrPostCallback callback, object state)
    {
        if (callback == null)
            throw new ArgumentNullException("callback");

        if (Current != this)
            throw new InvalidOperationException("Cannot Post to CoroutineSynchronizationContext from different thread!");

        Next = new Continuation(callback, state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        throw new NotSupportedException();
    }

    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
    {
        throw new NotSupportedException();
    }

    public override SynchronizationContext CreateCopy()
    {
        throw new NotSupportedException();
    }
}

我看不出如何使用它来实现与迭代器版本类似的行为。 提前为冗长的代码道歉!

编辑 2:新方法似乎有效。它允许我做类似的事情:

private static async Task Test()
{
    // Second resume
    await Sleep(1000);
    // Unknown how many resumes
}

private static async Task Main()
{
    // First resume
    await Coroutine.Yield();
    // Second resume
    await Test();
}

这提供了一种为游戏构建 AI 的非常好的方法。

最佳答案

已更新,后续博文: Asynchronous coroutines with C# 8.0 and IAsyncEnumerable .


I use C# iterators as a replacement for coroutines, and it has been working great. I want to switch to async/await as I think the syntax is cleaner and it gives me type safety...

IMO,这是一个非常有趣的问题,尽管我花了一段时间才完全理解它。也许,您没有提供足够的示例代码来说明这个概念。一个完整的应用程序会有所帮助,所以我会先尝试填补这个空白。下面的代码说明了我理解的使用模式,如果我错了请指正:

using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    // https://stackoverflow.com/q/22852251/1768303

    public class Program
    {
        class Resource : IDisposable
        {
            public void Dispose()
            {
                Console.WriteLine("Resource.Dispose");
            }

            ~Resource()
            {
                Console.WriteLine("~Resource");
            }
        }

        private IEnumerator Sleep(int milliseconds)
        {
            using (var resource = new Resource())
            {
                Stopwatch timer = Stopwatch.StartNew();
                do
                {
                    yield return null;
                }
                while (timer.ElapsedMilliseconds < milliseconds);
            }
        }

        void EnumeratorTest()
        {
            var enumerator = Sleep(100);
            enumerator.MoveNext();
            Thread.Sleep(500);
            //while (e.MoveNext());
            ((IDisposable)enumerator).Dispose();
        }

        public static void Main(string[] args)
        {
            new Program().EnumeratorTest();
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
            GC.WaitForPendingFinalizers();
            Console.ReadLine();
        }
    }
}

此处,Resource.Dispose((IDisposable)enumerator).Dispose() 而被调用。如果我们不调用 enumerator.Dispose(),那么我们将不得不取消注释 //while (e.MoveNext()); 并让迭代器优雅地完成, 以进行适当的展开。

现在,我认为用 async/await 实现它的最好方法是使用 custom awaiter :

using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    // https://stackoverflow.com/q/22852251/1768303
    public class Program
    {
        class Resource : IDisposable
        {
            public void Dispose()
            {
                Console.WriteLine("Resource.Dispose");
            }

            ~Resource()
            {
                Console.WriteLine("~Resource");
            }
        }

        async Task SleepAsync(int milliseconds, Awaiter awaiter)
        {
            using (var resource = new Resource())
            {
                Stopwatch timer = Stopwatch.StartNew();
                do
                {
                    await awaiter;
                }
                while (timer.ElapsedMilliseconds < milliseconds);
            }
            Console.WriteLine("Exit SleepAsync");
        }

        void AwaiterTest()
        {
            var awaiter = new Awaiter();
            var task = SleepAsync(100, awaiter);
            awaiter.MoveNext();
            Thread.Sleep(500);

            //while (awaiter.MoveNext()) ;
            awaiter.Dispose();
            task.Dispose();
        }

        public static void Main(string[] args)
        {
            new Program().AwaiterTest();
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
            GC.WaitForPendingFinalizers();
            Console.ReadLine();
        }

        // custom awaiter
        public class Awaiter :
            System.Runtime.CompilerServices.INotifyCompletion,
            IDisposable
        {
            Action _continuation;
            readonly CancellationTokenSource _cts = new CancellationTokenSource();

            public Awaiter()
            {
                Console.WriteLine("Awaiter()");
            }

            ~Awaiter()
            {
                Console.WriteLine("~Awaiter()");
            }

            public void Cancel()
            {
                _cts.Cancel();
            }

            // let the client observe cancellation
            public CancellationToken Token { get { return _cts.Token; } }

            // resume after await, called upon external event
            public bool MoveNext()
            {
                if (_continuation == null)
                    return false;

                var continuation = _continuation;
                _continuation = null;
                continuation();
                return _continuation != null;
            }

            // custom Awaiter methods
            public Awaiter GetAwaiter()
            {
                return this;
            }

            public bool IsCompleted
            {
                get { return false; }
            }

            public void GetResult()
            {
                this.Token.ThrowIfCancellationRequested();
            }

            // INotifyCompletion
            public void OnCompleted(Action continuation)
            {
                _continuation = continuation;
            }

            // IDispose
            public void Dispose()
            {
                Console.WriteLine("Awaiter.Dispose()");
                if (_continuation != null)
                {
                    Cancel();
                    MoveNext();
                }
            }
        }
    }
}

当需要放松时,我在 Awaiter.Dispose 中请求取消并将状态机驱动到下一步(如果有挂起的继续)。这导致观察到 Awaiter.GetResult 中的取消(由编译器生成的代码调用)。这会抛出 TaskCanceledException 并进一步展开 using 语句。因此,Resource 得到了正确处理。最后,任务转换为取消状态 (task.IsCancelled == true)。

IMO,这是比在当前线程上安装自定义同步上下文更简单直接的方法。它可以很容易地适应多线程(更多细节 here )。

IEnumerator/yield 相比,这确实会给您更多的自由。您可以在协程逻辑中使用 try/catch,您可以直接通过 Task 对象观察异常、取消和结果。

已更新,据我所知,当谈到 async 状态机时,迭代器生成的 IDispose 没有类比。当你想取消/解除它时,你真的必须将状态机驱动到尽头。如果你想解释 try/catch 的一些疏忽使用以防止取消,我认为你能做的最好的事情就是检查 _continuation 是否在 Awaiter.Cancel(在 MoveNext 之后)并抛出致命异常 out-of-the-band (使用辅助 async void 方法)。

关于c# - Async/await 作为协程的替代品,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22852251/

相关文章:

c# - 在 C# 中使用 SMO 从 ExecuteNonQuery 语句获取错误消息

javascript - 我应该如何使用 node.js 和 Mongodb 在 Javascript 中实现异步函数?

c# - 使用 Task.Delay 或 Task.Run 启动任务的区别

javascript - 如何将以下 "then"转而使用 await/async(map 函数)?

java - Java中网络调用列表和处理任务的优化编排

c# - 异步处理从数据库引擎返回的实体

C# Dll注入(inject)器,VB.Net Dll注入(inject)器

c# - 如何改变不同工作地点的连接字符串

c# - 在 Unity3D 中无延迟地截取屏幕截图

node.js - Nodejs 中的 jQuery.when() 相当于什么?