c# - 参与者内部的异步 API 调用和异常

标签 c# akka.net

我知道 PipeTo , 但是 some stuff, like synchronous waiting on nested continuation, seems to go against the async & await way.

所以,我的第一个问题 [1] 是:这里有什么“魔法”吗,这样我们就可以在延续中同步等待嵌套任务,而且它最终仍然是异步的?

当我们处于 async 和 await 差异时,如何处理失败?

让我们创建一个简单的示例:

public static class AsyncOperations
{
    public async static Task<int> CalculateAnswerAsync()
    {
        await Task.Delay(1000).ConfigureAwait(false);
        throw new InvalidOperationException("Testing!");
        //return 42;
    }

    public async static Task<string> ConvertAsync(int number)
    {
        await Task.Delay(600).ConfigureAwait(false);
        return number + " :)";
    }
}

以“常规”、异步和等待的方式:

var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);

正如您所期望的那样,异常将从第一个操作中冒出来。

现在,让我们创建一个将处理这些异步操作的 actor。为了争论,假设 CalculateAnswerAsyncConvertAsync 应该作为一个完整的操作一个接一个地使用(例如,类似于 StreamWriter. WriteLineAsyncStreamWriter.FlushAsync 如果您只想将一行写入流)。

public sealed class AsyncTestActor : ReceiveActor
{
    public sealed class Start
    {
    }

    public sealed class OperationResult
    {
        private readonly string message;

        public OperationResult(string message)
        {
            this.message = message;
        }

        public string Message
        {
            get { return message; }
        }
    }

    public AsyncTestActor()
    {
        Receive<Start>(msg =>
               {
                   AsyncOperations.CalculateAnswerAsync()
                     .ContinueWith(result =>
                            {
                                var number = result.Result;
                                var conversionTask = AsyncOperations.ConvertAsync(number);
                                conversionTask.Wait(1500);
                                return new OperationResult(conversionTask.Result);
                            })
                     .PipeTo(Self);
                });
        Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
    }
}

如果没有异常,我仍然可以毫无问题地得到 Got 42 :),这让我回到了 [1] 上面的“魔法”点。 此外,示例中提供的 AttachedToParentExecuteSynchronously 标志是可选的,还是非常需要它们才能让一切按预期工作?它们似乎对异常处理没有任何影响......

现在,如果 CalculateAnswerAsync 抛出异常,这意味着 result.Result 抛出 AggregateException,它几乎被吞没了。

如果可能的话,我应该怎么做才能使异步操作中的异常像“常规”异常一样使 actor 崩溃?

最佳答案

TPL 中错误处理的乐趣 :)

一旦任务开始在它自己的线程上运行,它内部发生的一切都已经与调用者异步——包括错误处理

  1. 当您在 actor 中启动您的第一个 Task 时,该任务将独立于您的 actor 在 ThreadPool 上运行。这意味着您在 Task 中所做的任何事情都已经与您的 actor 异步 - 因为它在不同的线程上运行。这is why I made a Task.Wait call inside the PipeTo sample you linked to在你的帖子的顶部。对 Actor 没有影响 - 它看起来像是一项长时间运行的任务。
  2. 异常 - 如果您的内部任务失败,conversionTask.Result 属性将抛出在其运行期间捕获的异常,因此您需要在您的 Task< 中添加一些错误处理 以确保您的 actor 在出现问题时得到通知。请注意,我在这里就是这样做的:https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117 - 如果您将异常转化为您的 Actor 可以处理的消息:鸟儿开始歌唱,彩虹闪耀,TPL 错误不再是痛苦和痛苦的根源。
  3. 至于抛出异常时会发生什么......

Now, if the CalculateAnswerAsync throws an exception, which means that result.Result throws AggregateException, it's pretty much swallowed without a trace.

AggregateException 将包含包含在其中的内部异常列表 - TPL 具有这种聚合错误概念的原因是 (a) 你有一个任务是延续多个任务的聚合,即 Task.WhenAll 或 (b) 您将错误传播到 ContinueWith 链回到父级。您还可以调用 AggregateException.Flatten() 调用来更轻松地管理嵌套异常。

TPL + Akka.NET 的最佳实践

处理来自 TPL 的异常是一件令人讨厌的事情,这是事实 - 但处理它的最佳方法是在 Tasktry..catch.. 异常并将它们转换为您的参与者可以处理的消息类。

Also, are the AttachedToParent and ExecuteSynchronously flags provided in an example optional, or are they pretty much required to have everything working as intended?

当您在延续上有延续时,这主要是一个问题 - PipeTo 自动在其自身上使用这些标志。它对错误处理的影响为零,但可确保您的延续立即在与原始 Task 相同的线程上执行。

我建议仅当您进行大量嵌套延续时才使用这些标志 - 一旦您深入到 1 个延续之后,TPL 开始对它如何安排您的任务采取一些自由(事实上,像 OnlyOnCompleted 这样的标志不再是超过 1 次继续后接受。)

关于c# - 参与者内部的异步 API 调用和异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28550275/

相关文章:

c# - 设置人工​​内存限制

c# - 无法将类型 'System.Web.Mvc.RedirectToRouteResult' 隐式转换为 'System.Web.Mvc.JsonResult'

C# 下载一系列文件

.net - 调度程序/通知服务建议/架构决策

akka.net 循环组路由器 - 只路由到一个路由

c# - 如何保留部署在不同集群节点上的两个相同参与者之间的状态? (阿卡网)

c# - Predicatebuilder group and or queries with inner outer

c# - 为什么 XmlDocument.Load(String) 似乎需要读写访问权限?

c# - Akka 网络集成测试

Akka.Net 发送大量消息(最大帧大小)