遍历文件夹时 Goroutine 死锁

标签 go concurrency goroutine

我有这个基于 pipelines 的代码例子。 walkFiles获取一个或多个文件夹(在 folders 变量中指定)并“访问”作为参数给出的所有文件夹中的文件。它还需要一个 done channel 允许取消,但我认为这对这个问题并不重要。

当仅通过一个文件夹进行行走时,代码按预期工作。但是当给出两个时,它给了我臭名昭著的 fatal error: all goroutines are asleep - deadlock!错误。它甚至看起来通过处理两个文件夹的文件正在做正确的事情,但它并没有很好地结束。我在这个函数的并发中犯了什么(可能很明显)错误?

这是代码:

type result struct {
    path     string
    checksum []byte
    err      error
}

type FileData struct {
    Hash []byte
}

// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel.  It sends the result of the
// walk on the error channel.  If done is closed, walkFiles abandons its work.
func (p Processor) walkFiles(done <-chan struct{}, folders []string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)

    visit := func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }

        if !info.Mode().IsRegular() {
            return nil
        }

        select {
        case paths <- path:
        case <-done:
            return errors.New("walk canceled")
        }
        return nil
    }

    var wg sync.WaitGroup
    for i, folder := range folders {
        wg.Add(1)
        go func(f string, i int) {
            defer wg.Done()
            // No select needed for this send, since errc is buffered.
            errc <- filepath.Walk(f, visit)
        }(folder, i)
    }

    go func() {
        wg.Wait()
        close(paths)
    }()

    return paths, errc
}

func closeFile(f *os.File) {
    err := f.Close()

    if err != nil {
        fmt.Fprintf(os.Stderr, "error: %v\n", err)
        os.Exit(1)
    }
}

// processor reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func (p Processor) process(done <-chan struct{}, files <-chan string, c chan<- result, loc *locator.Locator) {
    for f := range files {
        func() {
            file, err := os.Open(f.path)
            if err != nil {
                fmt.Println(err)
                return
            }
            defer closeFile(file)

            // Hashing file, producing `checksum` variable, and an `err`

            select {
            case c <- result{f.path, checksum, err}:

            case <-done:
                return
            }
        }()
    }
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.  In that case,
// MD5All does not wait for inflight read operations to complete.
func (p Processor) MD5All(folders []string) (map[string]FileData, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    paths, errc := p.walkFiles(done, folders)

    c := make(chan result)
    var wg sync.WaitGroup
    wg.Add(NUM_DIGESTERS)
    for i := 0; i < NUM_DIGESTERS; i++ {
        go func() {
            p.process(done, paths, c, loc)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()
    // End of pipeline. OMIT

    m := make(map[string]FileData)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = FileData{r.checksum}
    }

    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}


func (p Processor) Start() map[string]FileData {
    m, err := p.MD5All(p.folders)
    if err != nil {
        log.Fatal(err)
    }

    return m
}

最佳答案

问题在这里:

   if err := <-errc; err != nil {
        return nil, err
    }

您正在阅读 errc只有一次,但所有 groutine 都在写入它。一旦读取了第一个完成的 goroutine 的 errc,所有其他的都被卡住等待写入它。

使用 for 循环读取。

关于遍历文件夹时 Goroutine 死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59239910/

相关文章:

go - 如何结合 channel 和 WaitGroup 的工作?

go - unstructured.UnstructuredList 导致大量 reflect.go 跟踪

networking - Go 中的点对点网络

java - 多核场景下编写java程序的技巧

concurrency - go例程中的执行顺序

objective-c - 启动自动释放的 NSOperationQueue 是否危险?

go - 如何正确使用 channel 进行并发 POST API 调用并将数据记录在文件中

go - 这里有资源泄漏吗?

python - Go 中的 gRPC 服务器与 Python 中的客户端之间的兼容性

multithreading - 我可以一次对所有 slice 项目执行操作吗?