c# - 强制任务在当前线程上继续?

标签 c# async-await akka system.reactive tpl-dataflow

我正在为 .NET 移植 AKKA 框架(现在不要太认真,现在是其中 Actor 部分的周末 hack)

我对其中的“Future”支持有一些问题。 在 Java/Scala Akka 中,Future 将通过 Await 调用同步等待。 很像 .NET Task.Wait()

我的目标是为此支持真正的异步等待。 它现在可以正常工作,但在我当前的解决方案中,继续是在错误的线程上执行的。

这是向我的一位 Actor 传递消息时的结果,其中包含 future 的 await block 。 如您所见,actor 始终在同一个线程上执行,而 await block 在随机线程池线程上执行。

actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...

参与者使用数据流获取消息 BufferBlock<Message> 或者更确切地说,我在缓冲区 block 上使用 RX 来订阅消息。 它是这样配置的:

var messages = new BufferBlock<Message>()
{
        BoundedCapacity = 100,
        TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);

到目前为止一切顺利。

但是,当我等待 future 的结果时。 像这样:

protected override void OnReceive(IMessage message)
{
    ....

    var result = await Ask(logger, m);
    // This is not executed on the same thread as the above code
    result.Match()  
       .With<SomeMessage>(t => {
       Console.WriteLine("await thread {0}",
          System.Threading.Thread.CurrentThread.GetHashCode());
        })
       .Default(_ => Console.WriteLine("Unknown message"));
     ...

我知道这是 async await 的正常行为,但我真的必须确保只有一个线程可以访问我的 actor。

我不希望 future 同步运行,我想像平常一样运行异步,但我希望继续运行在与消息处理器/参与者相同的线程上。

我的 future 支持代码如下所示:

public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
    TaskCompletionSource<IMessage> result = 
        new TaskCompletionSource<IMessage>();
    var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());

    // once this object gets a response, 
    // we set the result for the task completion source
    var futureActorRef = new FutureActorRef(result);            
    future.Tell(new SetRespondTo(), futureActorRef); 
    actor.Tell(message, future); 
    return result.Task;
}

有什么想法可以强制继续在启动上述代码的同一线程上运行吗?

最佳答案

I'm making a port of the AKKA framework for .NET

甜甜的。尽管我从未接触过 Java/Scala/Akka,但我参加了 CodeMash '13 的 Akka 演讲。我在那里看到了 .NET 库/框架的巨大潜力。微软is working on something similar ,我希望它最终会普遍可用(它是 currently in a limited preview )。

我怀疑尽可能多地停留在 Dataflow/Rx 世界是更简单的方法; async 最适用于异步操作(每个操作有一个开始和一个结果),而 Dataflow 和 Rx 更适合流和订阅(有一个开始和多个结果)。所以我的第一个直觉 react 是要么将缓冲区 block 链接到具有特定调度程序的 ActionBlock,要么使用 ObserveOn 将 Rx 通知移动到特定调度程序,而不是尝试在 async 端完成。当然,我对 Akka API 设计不是很熟悉,所以对此持保留态度。

无论如何,我的async intro描述了调度 await 延续的仅有的两个可靠选项:SynchronizationContext.CurrentTaskScheduler.Current。如果您的 Akka 端口更像是一个框架(您的代码在其中进行托管,而最终用户代码始终由您的代码执行),那么 SynchronizationContext 可能有意义.如果您的端口更像是一个(最终用户代码在这里托管并根据需要调用您的代码),那么 TaskScheduler 会更有意义。

自定义 SynchronizationContext 的示例并不多,因为这种情况非常罕见。我有一个 AsyncContextThread type在我的 AsyncEx library它为该线程定义了 SynchronizationContextTaskScheduler。有几个自定义 TaskScheduler 的示例,例如 Parallel Extensions Extras它有一个 STA scheduler和一个 "current thread" scheduler .

关于c# - 强制任务在当前线程上继续?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20868161/

相关文章:

c# - 如何旋转 ashx 处理程序检索到的图像

c# - XNA 中的旋转

javascript - 如何在 Restify 中间件中捕获未处理的 Promise 拒绝?

java - 使用 scala akka 工具包与 java akka 工具包版本有什么好处?

scala - akka中FastFuture有什么用

java - OLP CLI 错误 : java. 基础未将 sun.security.util 导出到 JDK 16 下的未命名模块

c# - 使用 C# TcpChannel 服务器的 Java RMI 客户端

c# - 以编程方式将现有项目添加到新的 VS2012 解决方案失败

c# - 在我想要等待的方法上获得 Cannot await void

c# - BeginReceive 回调中的异步/等待