go - 使用 golang channel 的结果不一致

标签 go channel goroutine

我用 Go 编写了一个任务,用于从一堆文本文件中获取唯一列表。我使用 channel 进行了一些并行化,现在结果不一致 - 每次使用相同的输入文件输出/不输出 5 条记录的差异。

我正在用 go run process.go | 测试它wc -l 在 Fedora x86_64、go1.1.2、8 核 amd 上。

代码是:

package main

import (
    "fmt"
    "os"
    "io"    
    "encoding/csv"
    "regexp"
    "log"
)

var (
    cleanRe *regexp.Regexp = regexp.MustCompile("[^0-9]+")
    comma rune ='\t'
    fieldsPerRecord=-1
)

func clean(s string) string {
    clean:=cleanRe.ReplaceAllLiteralString(s,"")
    if len(clean)<6 {return ""}
    return clean
}

func uniqueChannel(inputChan chan []string, controlChan chan string) {
    defer func(){controlChan<-"Input digester."}()
    uniq:=make(map[string]map[string]bool)
    i:=0
    for record:= range inputChan {
        i++
        id,v:=record[0],record[1]
        if uniq[id]==nil {
            uniq[id]=make(map[string]bool)
        } else if !uniq[id][v] {
            uniq[id][v]=true
            fmt.Println(id,string(comma),v)
        }
    }
    log.Println("digest ", i)
}

func processFile(fileName string, outputChan chan []string, controlChan chan string) {
    defer func(){controlChan<-fileName}()
    f,err:=os.Open(fileName)
    if err!=nil{log.Fatal(err)}
    r:=csv.NewReader(f)
    r.FieldsPerRecord = fieldsPerRecord
    r.Comma = comma

    //  Process the records
    i:=0
    for record,err:=r.Read();err!=io.EOF;record,err=r.Read() {
        if err!=nil{continue}
        id:=record[0]
        for _,v:=range record[1:] {
            if cleanV:=clean(v);cleanV!=""{
                i++
                outputChan<-[]string{id,cleanV}
            }
        }
    }
    log.Println(fileName,i)
}


func main() {
    inputs:=[]string{}
    recordChan:=make(chan []string,100)
    processesLeft:=len(inputs)+1
    controlChan:=make(chan string,processesLeft)

    //  Ingest the inputs
    for _,fName:=range inputs {
        go processFile(fName,recordChan,controlChan)
    }

    //  This is the loop to ensure it's all unique
    go uniqueChannel(recordChan,controlChan)

    //  Make sure all the channels close up
    for processesLeft>0 {
        if processesLeft==1{
            close(recordChan)
        }
        c:=<-controlChan
        log.Println(c)
        processesLeft--
    }
    close(controlChan)
}

它似乎在 channel 空无一人之前就关闭了它。如果没有关闭机制,我就会陷入僵局 - 我没有想法。

最佳答案

您可以放弃控制 channel 并使用 sync.WaitGroup:

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "os"
    "regexp"
    "sync"
)

var (
    cleanRe         *regexp.Regexp = regexp.MustCompile("[^0-9]+")
    comma           rune           = '\t'
    fieldsPerRecord                = -1
)

func clean(s string) string {
    clean := cleanRe.ReplaceAllLiteralString(s, "")
    if len(clean) < 6 {
        return ""
    }
    return clean
}

func uniqueChannel(inputChan chan []string) {
    uniq := make(map[string]map[string]bool)
    i := 0
    for record := range inputChan {
        i++
        id, v := record[0], record[1]
        if uniq[id] == nil {
            uniq[id] = make(map[string]bool)
        } else if !uniq[id][v] {
            uniq[id][v] = true
            fmt.Println(id, string(comma), v)
        }
    }
    log.Println("digest ", i)
}

func processFile(fileName string, outputChan chan []string) {
    f, err := os.Open(fileName)
    if err != nil {
        log.Fatal(err)
    }
    r := csv.NewReader(f)
    r.FieldsPerRecord = fieldsPerRecord
    r.Comma = comma

    //  Process the records
    for record, err := r.Read(); err != io.EOF; record, err = r.Read() {
        if err != nil {
            continue
        }
        id := record[0]
        for _, v := range record[1:] {
            if cleanV := clean(v); cleanV != "" {
                outputChan <- []string{id, cleanV}
            }
        }
    }
}

func main() {
    inputs := []string{"ex.tsv"}
    recordChan := make(chan []string)

    var wg sync.WaitGroup
    //  Ingest the inputs
    for _, fName := range inputs {
        wg.Add(1)
        go func() {
            processFile(fName, recordChan)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(recordChan)
    }()

    //  This is the loop to ensure it's all unique
    uniqueChannel(recordChan)
}

关于go - 使用 golang channel 的结果不一致,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20652221/

相关文章:

unit-testing - 我应该如何处理使用 runtime.GOOS 的 Go 函数的测试?

go - 使用 golang 从 YAML 解码/编码多行字符串字段

go - 如何停止正在监听 RethinkDB 变更源的 goroutine?

matlab - Simulink 中的多 channel 音频输出

csv - 同时写入多个 csv 文件,在 Golang 中的分区列上拆分

去例程: does select really pick a random case?

memory-management - 使用 `make` 并发内存分配?

go - 哪种新语言最适合编写操作系统

go - 没有 fmt.Print() 就不会发生光标移动

sockets - BufferedReader-在流的末尾阻塞