Go 例程以 for 循环开始 - 一个还是多个 channel ?

标签 go channel

我想使用从 for 循环调用的 goroutine 加载一些 json 文件(“.json”)。我希望并行加载(在加载其他文件时处理第一个文件)。

第一季度。由于文件数量可能会有所不同(要添加新文件),我将使用带有文件名的(文件)列表(仅在本示例中自动生成名称),因此我想使用 for 循环。最佳?

第二季度。最有效地利用 channel 的方式是什么。

第三季度。如果每个加载操作需要一个唯一的 channel (如下面的示例代码所示),我将如何定义 channel ?

示例代码(要压缩并能够使用文件名列表加载文件):


func load_json(aChan chan byte, s string) {
    // load "filename" + s + ".json"
    // confirm to the channel
    aChan <- 0
}

func do_stuff() {
    // .. with the newly loaded json
}

func Main() {
    chan_A := make(chan byte)
    go load_json(chan_A, "_classA")

    chan_B := make(chan byte)
    go load_json(chan_B, "_classB")

    chan_C := make(chan byte)
    go load_json(chan_C, "_classC")

    chan_D := make(chan byte)
    go load_json(chan_D, "_classD")


    <-chan_A
        // Now, do stuff with Class A
    <-chan_B
        // etc...
    <-chan_C
    <-chan_D
    fmt.Println("Done.")
}

编辑: 我根据“Tom”建议的想法设计了一个简化的测试解决方案(见下文)。就我而言,我将任务分为三个阶段,每个阶段使用一个 channel 来控制执行。但是,我倾向于使用此代码出现死锁(请参阅执行结果和代码下方的注释)。

PlayGround 上运行此代码.

如何避免此代码中的死锁?:

type TJsonFileInfo struct {
    FileName string
}
type TChannelTracer struct {  // Will count & display visited phases A, B, C
    A, B, C int
}
var ChannelTracer TChannelTracer

var jsonFileList = []string{
    "./files/classA.json",
    "./files/classB.json",
    "./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    var newFileInfo TJsonFileInfo
    newFileInfo.FileName = aFileName
    // file, e := ioutil.ReadFile(newFileInfo.FileName)...
    ChannelTracer.A += 1
    fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
    aResultQueueChan <- &newFileInfo
}

func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.B += 1
    fmt.Printf("B. Marshalled file: %s\n", FileInfo.FileName)
    aResultQueueChan <- FileInfo
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.C += 1
    fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
    aDoneQueueChan <- FileInfo
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
        go UnmarshalFile(marshalChan, processChan)
        go ProcessWork(processChan, doneProcessingChan)
    }

    for {
        select {
        case result := <-marshalChan:
            result.FileName = result.FileName // dummy use
        case result := <-processChan:
            result.FileName = result.FileName // dummy use
        case result := <-doneProcessingChan:
            result.FileName = result.FileName // dummy use
            fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
        }
    }
}

/**
RESULTS (for phases A, B and C):

A. Loaded file: ./files/classA.json
A. Loaded file: ./files/classB.json
A. Loaded file: ./files/classC.json
B. Marshalled file: ./files/classB.json
B. Marshalled file: ./files/classC.json
C. Processed file: ./files/classB.json 
C. Processed file: ./files/classC.json 
Done. Channels visited: {3 2 2}     // ChannelTracer for phase A, B and C
Done. Channels visited: {3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/

请注意,此代码不会访问文件系统,因此它应该在 PlayGround 上运行

编辑2:-除了不安全的“ChannelTracer”之外,我只能通过与文件任务消耗相同次数的doneProcessingChannel来避免死锁。
在此处运行代码: Playground

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }

    //  Read doneProcessingChan equal number of times
    //  as the spawned tasks (files) above :
    for i := 0; i < len(jsonFileList); i++ {
        <-doneProcessingChan
        fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
    }
}

//RIL

最佳答案

建立在answer之上通过 @BraveNewCurrency我写了一个简单的例子 program为您服务:

package main

import (
    "encoding/json"
    "fmt"
    "os"
)

type Result struct {
    Some    string
    Another string
    AndAn   int
}

func generateWork(work chan *os.File) {
    files := []string{
        "/home/foo/a.json",
        "/home/foo/b.json",
        "/home/foo/c.json",
    }
    for _, path := range files {
        file, e := os.Open(path)
        if e != nil {
            panic(e)
        }
        work <- file
    }
}

func processWork(work chan *os.File, done chan Result) {
    file := <-work
    decoder := json.NewDecoder(file)
    result := Result{}
    decoder.Decode(&result)
    done <- result
}

func main() {
    work := make(chan *os.File)
    go generateWork(work)
    done := make(chan Result)
    for i := 0; i < 100; i++ {
        go processWork(work, done)
    }
    for {
        select {
        case result := <-done:
            // a result is available
            fmt.Println(result)
        }
    }
}

请注意,该程序无法在 Playground 上运行,因为那里不允许文件系统访问。

编辑:

为了回答您问题中的版本,我获取了代码和 changed some small things :

package main

import (
    _ "encoding/json"
    "fmt"
    _ "io/ioutil"
    _ "os"
)

type TJsonMetaInfo struct {
    MetaSystem string
}

type TJsonFileInfo struct {
    FileName string
}

type TChannelTracer struct { // Will count & display visited phases A, B, C
    A, B, C int
}

var ChannelTracer TChannelTracer

var jsonFileList = []string{
    "./files/classA.json",
    "./files/classB.json",
    "./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    newFileInfo := TJsonFileInfo{aFileName}
    // file, e := ioutil.ReadFile(newFileInfo.FileName)
    // etc...
    ChannelTracer.A += 1
    fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
    aResultQueueChan <- &newFileInfo
}

func UnmarshalFiles(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    for {
        FileInfo := <-aWorkQueueChan
        ChannelTracer.B += 1
        fmt.Printf("B. Unmarshalled file: %s\n", FileInfo.FileName)
        aResultQueueChan <- FileInfo
    }
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    for {
        FileInfo := <-aWorkQueueChan
        ChannelTracer.C += 1
        fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
        aDoneQueueChan <- FileInfo

    }
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }

    for {
        select {
        case result := <-doneProcessingChan:
            result.FileName = result.FileName // dummy use
            fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
        }
    }
}

请注意,此代码仍然死锁,但最后,当所有工作完成时,在 main() 中的最后一个空 for 循环中。

另请注意这些行:

ChannelTracer.A += 1
ChannelTracer.B += 1
ChannelTracer.C += 1

不是并发安全的。这意味着在多线程环境中,一个 goroutine 和另一个 goroutine 可能会尝试同时递增同一计数器,从而导致计数错误。要解决此问题,请查看以下软件包:

关于Go 例程以 for 循环开始 - 一个还是多个 channel ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16873000/

相关文章:

node.js - Golang 中的 Bcrypt 密码散列(与 Node.js 兼容)?

go - 为什么没有接收器被阻塞的错误?

audio - 使用音频工具 sox,如何确定立体声录音是否实际上是单声道?

具有多种类型的协程和 channel

sockets - 在不关闭连接的情况下关闭从 TCP 连接读取的 goroutine

go 例程未从 channel 收集所有对象

reflection - 解决反射开销的最佳方法是什么?

go - 大整数给我错误的平等结果

go - 在 Golang 中转换组合类型

go - ffmpeg:读取 header 管道时出错:0:参数无效