concurrency - 保证发送到邮箱处理器的消息的顺序

标签 concurrency f#

我有一个邮箱处理器,它接收固定数量的消息:

let consumeThreeMessages = MailboxProcessor.Start(fun inbox ->
        async {
            let! msg1 = inbox.Receive()
            printfn "msg1: %s" msg1

            let! msg2 = inbox.Receive()
            printfn "msg2: %s" msg2

            let! msg3 = inbox.Receive()
            printfn "msg3: %s" msg3
        }
    )

consumeThreeMessages.Post("First message")
consumeThreeMessages.Post("Second message")
consumeThreeMessages.Post("Third message")

这些消息应该完全按照发送的顺序进行处理。在我的测试过程中,它准确地打印出它应该的内容:
First message
Second message
Third message

但是,由于消息发布是异步的,听起来快速发布 3 条消息可能会导致以任何顺序处理项目。例如,我不想无序接收消息并得到如下内容:
Second message // <-- oh noes!
First message
Third message 

是否保证按发送顺序接收和处理消息?或者是否有可能乱序接收或处理消息?

最佳答案

您的 consumeThreeMessages 中的代码由于 F# 的异步工作流的工作方式,函数将始终按顺序执行。

以下代码:

   async {
            let! msg1 = inbox.Receive()
            printfn "msg1: %s" msg1

            let! msg2 = inbox.Receive()
            printfn "msg2: %s" msg2

        }

大致翻译为:
async.Bind(
    inbox.Receive(),
    (fun msg1 -> 
         printfn "msg1: %s" msg1
         async.Bind(
             inbox.Receive(),
             (fun msg2 -> printfn "msg2: %s" msg2)
         )
    )
)

当您查看脱糖形式时,很明显代码是串行执行的。 'async' 部分在 async.Bind 的实现中发挥作用。 ,它将异步启动计算并在完成时“唤醒”以完成执行。这样您就可以利用异步硬件操作,而不会在等待 IO 操作的 OS 线程上浪费时间。

然而,这并不意味着您在使用 F# 的异步工作流时不会遇到并发问题。假设您执行了以下操作:
let total = ref 0

let doTaskAsync() = 
    async { 
        for i = 0 to 1000 do 
            incr total
    } |> Async.Start()

// Start the task twice
doTaskAsync()
doTaskAsync()

上面的代码会有两个异步工作流同时修改同一个状态。

因此,简要回答您的问题:在单个异步块的主体内,事物将始终按顺序执行。 (也就是说, let! 或 do! 之后的下一行在异步操作完成之前不会执行。)但是,如果您在两个异步任务之间共享状态,则所有赌注都将关闭。在这种情况下,您需要考虑锁定或使用 CLR 4.0 附带的并发数据结构。

关于concurrency - 保证发送到邮箱处理器的消息的顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2219686/

相关文章:

java - 多线程应用程序中的不可变对象(immutable对象) - 它是如何工作的?

haskell - 在 Haskell 中内存 IO 计算

java - 并发 hashmap size() 方法复杂度

java - 在 Java 中使用 volatile 集合和数组

f# - 如何在 F# 中编写高效的列表/序列函数? (mapFoldWhile)

f# - 是否可以将 F# 代码转换为 C# 代码?

generics - 转换联合 <'a> to a Union<' b>

f# - 在 .net 5 中运行 Forms 表单 F# 脚本?

f# - 为什么不能将带有 byref 的函数直接转换为委托(delegate)?

Java:并发读取 InputStream