f# - 为什么任务没有分配给所有 worker ?

标签 f# netmq

以下内容翻译自 Divide and Conquer example在 ZeroMQ 指南中。

module ZeroMQ

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets

let parallel_task () =
    let task_number = 100
    let uri_source, uri_sink = 
        let uri = "ipc://parallel_task"
        Path.Join(uri,"source"), Path.Join(uri,"sink")

    printfn "%A, %A" uri_source uri_sink

    let rnd = Random()
    use source = new PushSocket(uri_source)
    use sink = new PushSocket(uri_sink)
    let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)

    let ventilator_init () =
        printf "Press enter when workers are ready.\n"
        printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
        Console.ReadLine() |> ignore

    let ventilator_run () =
        sink.SendFrame("0")
        printf "Sending tasks to workers.\n"
        Array.iter (string >> source.SendFrame) tasks
        Thread.Sleep(1)

    let worker i () =
        printf "Starting worker %i\n" i
        use source = new PullSocket(uri_source)
        use sink = new PushSocket(uri_sink)
        while true do
            let msg = source.ReceiveFrameString()
            printf "Worker %i received message.\n" i
            //printf "%s.\n" msg
            Thread.Sleep(int msg)
            sink.SendFrame("")

    let sink () =
        use sink = new PullSocket(uri_sink)
        let watch = Diagnostics.Stopwatch()
        for i=1 to task_number do
            let _ = sink.ReceiveFrameString()
            if watch.IsRunning = false then watch.Start()
            printf (if i % 10 = 0 then ":" else ".")
        printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
    ventilator_init()
    for i=1 to 4 do Task.Run (worker i) |> ignore
    let t = Task.Run sink
    ventilator_run()
    t.Wait()

[<EntryPoint>]
let main argv =
    parallel_task()
    0

这里发生的情况是,单个工作线程获取所有消息,并且其他线程都没有被分配任何工作。为什么会发生这种情况?

最佳答案

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets

let parallel_task () =
    let task_number = 100
    let uri_source, uri_sink = 
        let uri = "ipc://parallel_task"
        Path.Join(uri,"source"), Path.Join(uri,"sink")

    let ventilator () =
        let rnd = Random()
        use source = new PushSocket()
        source.Bind(uri_source)
        use sink = new PushSocket()
        sink.Connect(uri_sink)
        let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
        printf "Press enter when workers are ready.\n"
        printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
        Console.ReadLine() |> ignore
        sink.SendFrame("0")
        printf "Sending tasks to workers.\n"
        Array.iter (string >> source.SendFrame) tasks
        Thread.Sleep(1)

    let worker i () =
        printf "Starting worker %i\n" i
        use source = new PullSocket()
        source.Connect(uri_source)
        use sink = new PushSocket()
        sink.Connect(uri_sink)
        while true do
            let msg = source.ReceiveFrameString()
            printf "Worker %i received message.\n" i
            Thread.Sleep(int msg)
            sink.SendFrame("")

    let sink () =
        use sink = new PullSocket()
        sink.Bind(uri_sink)
        let watch = Diagnostics.Stopwatch()
        for i=1 to task_number do
            let _ = sink.ReceiveFrameString()
            if watch.IsRunning = false then watch.Start()
            printf (if i % 10 = 0 then ":" else ".")
        printf "\nTotal elapsed time: %A msec\n" watch.Elapsed

    Task.Run ventilator |> ignore
    for i=1 to 4 do Task.Run (worker i) |> ignore
    Task.Run(sink).Wait()

这是上面的清理版本,可以正常工作。我必须明确指出什么是绑定(bind),什么是连接。谢谢@somdoron 的提示。

关于f# - 为什么任务没有分配给所有 worker ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61731469/

相关文章:

f# - 将数据库记录建模为类型

c# - NetMQ 未处理异常最佳实践

c# - 从多个线程访问NetMQ套接字

f# - 如何在 SqlTransaction 中使用 "use"绑定(bind)

F# 中断 while 循环

android - Xamarin/安卓 : F# scoping - how do I see a namespace in a different file?

f# - OCaml 函数少传入一个参数

c# - ZeroMQ 服务器/客户端类型的正确模式

c# - 与经典 TCP 套接字通信