我正在尝试解决以下问题。我有一些实时运行的代理,有几毫秒的大心跳,因此它们处理的操作顺序大多是确定性的(因为消息处理不是瓶颈)。
现在,我正在对不再有心跳的系统运行大量模拟(否则将需要几个世纪)- 但我需要确保保留操作顺序。为此,我采用了以下解决方案:模拟器通过发布虚拟同步消息并在等待应答时阻塞来确保每个代理都处理了他的消息队列。这确实适用于我的应用程序,但它所花费的时间并不直观 - 因为单线程实现会快一个数量级(我猜 - x 100 左右 - 虽然我没有测试过)。
我已经隔离了一个显示问题的小测试,甚至尝试使用另一个库 akka.net
type Greet =
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2
[<EntryPoint>]
let main argv =
let system = System.create "MySystem" <| Configuration.load()
let greeter = spawn system "greeter" <| fun mailbox ->
let rec loop() = actor {
let! msg = mailbox.Receive()
let sender = mailbox.Sender()
match msg with
| Greet who -> () // printf "Hello, %s!\n" who
| Hello2 -> sender.Tell(true)
| _ -> ()
return! loop()
}
loop()
let greeterF =
MailboxProcessor.Start
(fun inbox ->
async {
while true do
let! msg = inbox.Receive()
match msg with
| Greet who -> () // printf "Hello, %s!\n" who
| Hello reply -> reply.Reply true
| _ -> ()
}
)
let n = 1000000
let t1 = System.Diagnostics.Stopwatch()
t1.Start()
for i = 1 to n do
let rep = greeterF.PostAndReply(fun reply -> (Hello reply)) |> ignore
()
printfn "elapsed Mailbox:%A" t1.ElapsedMilliseconds
t1.Restart()
for i = 1 to n do
let res = greeter.Ask (Hello2)
let rr = res.Result
()
printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
System.Console.ReadLine () |> ignore
0
基本上,对于仅仅 100 万次同步,两者都需要大约 10 秒的时间 - 而不是任何涉及的计算,这是......不幸的。
我想知道是否有人遇到过同样的问题,是否有办法关闭开销,迫使一切都在单线程模式下运行……这比停用所有 cpus 但 1 个在bios - 或者在没有代理的情况下编写整个系统的克隆。
非常感谢任何帮助。
最佳答案
这里 Akka.NET 版本变慢的原因是你与 actor 的沟通方式:
main process Task FutureActorRef !!ThreadPool!! greeter
Ask ---------------------->
Tell----------->
MailboxRun ----->
(greeter mailbox is empty) |
<--------------------------Tell
<--Complete task
<----------.Result
对于每次迭代,都会创建一个 TPL 任务
然后一条消息被发送给欢迎者。
然后主进程在等待响应返回时阻塞。
欢迎程序回复,这又完成了
FutureActorRef
中的任务
冲洗并重复.. 这种设计将导致 Akka.NET 为每条消息启动和停止欢迎程序“邮箱运行”,因为邮箱队列在每次迭代中变空。 这会导致为传递的每条消息进行线程池调度。
It's a bit like entering your car, putting the pedal to the metal, then abruptly stop and step out of the car, and then repeating the procedure again. That is just not a very effective way to travel fast.
@Aaronontheweb 的建议只有在您解决代码中的上述问题时才会生效。 邮箱需要能够不断地挑选内部队列的消息来批量处理消息,以实现全吞吐量。
相反,将生产者与消费者分开。 创建一个 actor 来监听你的问候者的响应。 一旦该参与者处理了您的 1000000 条消息,让该参与者将 WorkCompleted 消息发送回消费者。
[编辑] 我自己试了一下,我不懂 F#,所以它可能不完全地道:)
open Akka
open Akka.Actor
open Akka.FSharp
type Greet =
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2
type Consume =
| Response
| SetSender
[<EntryPoint>]
let main argv =
let system = System.create "MySystem" <| Configuration.load()
let greeter = spawn system "greeter" <| fun mailbox ->
let rec loop() = actor {
let! msg = mailbox.Receive()
let sender = mailbox.Sender()
match msg with
| Greet who -> () // printf "Hello, %s!\n" who
| Hello2 -> sender.Tell(Response)
| _ -> ()
return! loop()
}
loop()
let consumer = spawn system "consumer" <| fun mailbox ->
let rec loop(count,sender : IActorRef) = actor {
if count = 1000000 then sender.Tell(true)
let! msg = mailbox.Receive()
match msg with
| Response -> return! loop(count+1,sender)
| SetSender -> return! loop(count,mailbox.Sender())
}
loop(0,null)
let n = 1000000
let t1 = System.Diagnostics.Stopwatch()
t1.Start()
for i = 1 to n do
greeter.Tell(Hello2,consumer)
let workdone = consumer.Ask SetSender
workdone.Wait()
printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
System.Console.ReadLine () |> ignore
0
我更新了您的代码以使用单独的消费者来响应参与者,然后在处理完所有回复后回复。
通过这样做,您在我的机器上的处理时间现在减少到 650 毫秒。
如果您想要更好的吞吐量,您需要让更多的参与者参与进来以实现更多的并行化。
我不确定这对您的特定情况是否有帮助
关于performance - 同步时 F# Akka.NET 代理性能优化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29638978/