c# - 在 akka.net actor 中使用 async/await 是否安全

标签 c# async-await akka.net

在下面的代码中,我使用了 .net 提供的语法糖,即 async/await 方法,但请注意这不是在 akka 中处理异步操作的好方法,我应该使用 PipeTo()。

public class AggregatorActor : ActorBase, IWithUnboundedStash
{
    #region Constructor
    public AggregatorActor(IActorSystemSettings settings, IAccountComponent component, LogSettings logSettings) : base(settings, logSettings)
    {
        _accountComponent = component;
        _settings = settings;
    }
    #endregion

    #region Public Methods

    public override void Listening()
    {

        ReceiveAsync<ProfilerMessages.ProfilerBase>(async x => await HandleMessage(x));
        ReceiveAsync<ProfilerMessages.TimeElasped>(async x => await HandleMessage(x));
    }
    public override async Task HandleMessage(object msg)
    {
        msg.Match().With<ProfilerMessages.GetSummary>(async x =>
        {
            _sender = Context.Sender;
            //Become busy. Stash
            Become(Busy);

            //Handle different request
            await HandleSummaryRequest(x.UserId, x.CasinoId, x.GamingServerId, x.AccountNumber, x.GroupName);
        });
        msg.Match().With<ProfilerMessages.RecurringCheck>(x =>
        {
            _logger.Info("Recurring Message");
            if (IsAllResponsesReceived())
            {
                BeginAggregate();
            }
        });
        msg.Match().With<ProfilerMessages.TimeElasped>(x =>
        {
            _logger.Info("Time Elapsed");
            BeginAggregate();
        });
    }
    private async Task HandleSummaryRequest(int userId, int casinoId, int gsid, string accountNumber, string groupName)
    {
        try
        {
            var accountMsg = new AccountMessages.GetAggregatedData(userId, accountNumber, casinoId, gsid);
            //AskPattern.AskAsync<AccountMessages.AccountResponseAll>(Context.Self, _accountActor, accountMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
            _accountActor.Tell(accountMsg);

            var contactMsg = new ContactMessages.GetAggregatedContactDetails(userId);
            //AskPattern.AskAsync<Messages.ContactMessages.ContactResponse>(Context.Self, _contactActor, contactMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
            _contactActor.Tell(contactMsg);

            var analyticMsg = new AnalyticsMessages.GetAggregatedAnalytics(userId, casinoId, gsid);
            //AskPattern.AskAsync<Messages.AnalyticsMessages.AnalyticsResponse>(Context.Self, _analyticsActor, analyticMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
            _analyticsActor.Tell(analyticMsg);

            var financialMsg = new FinancialMessages.GetAggregatedFinancialDetails(userId.ToString());
            //AskPattern.AskAsync<Messages.FinancialMessages.FinancialResponse>(Context.Self, _financialActor, financialMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
            _financialActor.Tell(financialMsg);

            var verificationMsg = VerificationMessages.GetAggregatedVerification.Instance(groupName, casinoId.ToString(), userId.ToString(), gsid);
            _verificationActor.Tell(verificationMsg);

            var riskMessage = RiskMessages.GeAggregatedRiskDetails.Instance(userId, accountNumber, groupName, casinoId, gsid);
            _riskActor.Tell(riskMessage);

            _cancelable = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromMilliseconds(_settings.AggregatorTimeOut), Self, Messages.ProfilerMessages.TimeElasped.Instance(), Self);
            _cancelRecurring = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.RecurringResponseCheck, _settings.RecurringResponseCheck, Self, Messages.ProfilerMessages.RecurringCheck.Instance(), Self);
        }
        catch (Exception ex)
        {
            ExceptionHandler(ex);
        }
    }
    #endregion
}

如您在示例代码中所见,我正在使用 async/await,并使用 Akka.net 提供的 ReceiveAsync() 方法。

如果我们不能在 actor 中使用 async/await,那么 ReceiveAsync() 的目的是什么?

最佳答案

您可以在 actor 中使用 async/await,但这需要一些必要的编排来暂停/恢复 actor 的邮箱,直到异步任务完成。 这使得 actor 不可重入,这意味着它不会选择任何新消息,直到当前任务完成。要在 actor 中使用 async/await,您可以:

  1. 使用可以采用异步处理程序的 ReceiveAsync
  2. 使用 ActorTaskScheduler.RunTask 包装您的异步方法调用。这通常在 actor 生命周期方法的上下文中很有用(例如 PreStart/PostStop)。

请记住,如果使用默认的 actor 消息调度程序,这将有效,但如果 actor 配置为使用不同类型的调度程序,则不能保证有效。

此外,在 actor 内部使用 async/await 也会带来性能下降,这与挂起/恢复机制和 actor 缺乏可重入性有关。在许多业务案例中,这并不是真正的问题,但有时在高性能/低延迟工作流中可能是个问题。

关于c# - 在 akka.net actor 中使用 async/await 是否安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47709420/

相关文章:

c# - .Net Core XmlDocument 不返回声明

c# - 如何使用设计器生成的数据表手动设置自动增量列的 ID?

javascript - 如何在 3 秒后运行等待函数

c# - 在单独的进程中部署 Akka.net 参与者

c# - Braintree API 在 Sandbox 中的 CreateCard.Create 上抛出 Braintree.Exceptions.AuthorizationException

c# - 浏览器提示下载 JSON 响应,ASP.NET MVC2

c# - 具有受控并行性和 AwaitAll 选项的异步任务

c# - 如何结合 TaskCompletionSource 和 CancellationTokenSource?

c# - Akka.Remote 配置失败

Akka.Net 自定义Mailbox,自定义IMessageQueue,或者其他东西