go - 关闭未知长度的 channel

标签 go concurrency channel

当不知道 channel 时,我无法关闭 channel
长度

package main

import (
    "fmt"
    "time"
)

func gen(ch chan int) {
    var i int
    for {
        time.Sleep(time.Millisecond * 10)
        ch <- i
        i++
        // when no more data (e.g. from db, or event stream)
        if i > 100 {
            break
        }
    }

    // hot to close it properly?
    close(ch)
}

func receiver(ch chan int) {
    for i := range ch {
        fmt.Println("received:", i)
    }
}

func main() {
    ch := make(chan int)

    for i := 0; i < 10; i++ {
        go gen(ch)
    }

    receiver(ch)
}

它给我错误

panic: send on closed channel

goroutine 8 [running]:
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:12 +0x57
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:35 +0xbd

goroutine 1 [panicwait]:
runtime.gopark(0x0, 0x0, 0x50b8e0, 0x9, 0x10, 0x1)
    /usr/lib/go/src/runtime/proc.go:185 +0x163
runtime.main()
    /usr/lib/go/src/runtime/proc.go:121 +0x2f4
runtime.goexit()
    /usr/lib/go/src/runtime/asm_amd64.s:1696 +0x1

goroutine 6 [sleep]:
time.Sleep(0x989680)
    /usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:33 +0x79

goroutine 7 [sleep]:
time.Sleep(0x989680)
    /usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:34 +0x9b
exit status 2

这是合乎逻辑的——第一个 goroutine 在第二个 goroutine 尝试发送给它时关闭 channel 。在这种情况下关闭 channel 的最佳方法是什么?

最佳答案

一旦一个 channel 关闭,你就不能在它上面发送更多的值,否则它会崩溃。这就是您的体验。

这是因为您启动了多个使用相同 channel 的 goroutine,并且它们在该 channel 上发送值。然后您关闭每个 channel 中的 channel 。而且由于它们不同步,一旦第一个 goroutine 到达它关闭它的位置,其他 goroutine 可能(并且他们将)继续向其发送值: panic !

您只能关闭一次 channel (尝试关闭一个已经关闭的 channel 也会出现 panic )。当所有发送值的 goroutines 都完成时,你应该这样做。为此,您需要检测所有发送者 goroutine 何时完成。一种惯用的检测方法是使用 sync.WaitGroup .

对于每个启动的发送者 goroutine,我们使用 WaitGroup.Add() 将 1 添加到 WaitGroup .每个完成发送值的 goroutine 都可以通过调用 WaitGroup.Done() 来发出信号.最好将此作为延迟语句执行,因此如果您的 goroutine 突然终止(例如 panic ),WaitGroup.Done() 仍将被调用,并且不会让其他 goroutine 挂起(等待赦免) - 一个永远不会出现的“缺失”WaitGroup.Done() 调用...)。

WaitGroup.Wait()将等到所有发送者 goroutine 都完成,并且只有在此之后,它才会关闭 channel 。我们想要检测这个“全局”完成事件并在处理发送到它的值时关闭 channel ,所以我们必须在它自己的 goroutine 中执行此操作。

由于我们在 channel 上使用了 for ... range 构造,因此接收器 goroutine 将一直运行直到 channel 关闭。并且由于它在主 goroutine 中运行,因此在从 channel 正确接收和处理所有值之前,程序不会退出。 for ... range 构造循环,直到接收到在 channel 关闭之前发送的所有值。

请注意,下面的解决方案无需修改也适用于缓冲和非缓冲 channel (尝试使用带有 ch := make(chan int, 100) 的缓冲 channel )。

正确的解决方案(在 Go Playground 上尝试):

func gen(ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    var i int
    for {
        time.Sleep(time.Millisecond * 10)
        ch <- i
        i++
        // when no more data (e.g. from db, or event stream)
        if i > 100 {
            break
        }
    }
}

func receiver(ch chan int) {
    for i := range ch {
        fmt.Println("received:", i)
    }
}

func main() {
    ch := make(chan int)
    wg := &sync.WaitGroup{}

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go gen(ch, wg)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()

    receiver(ch)
}

注意:

请注意,receiver(ch) 在主 goroutine 中运行很重要,代码等待 WaitGroup 并自行关闭 channel (非主要)协程;而不是相反。如果您切换这 2 个,可能会导致“提前退出”,即并非所有值都可以从 channel 接收和处理。这是因为 Go 程序在主 goroutine 完成时退出(规范:Program execution)。它不会等待其他(非主)goroutines 完成。因此,如果在主 goroutine 中等待和关闭 channel ,则在关闭 channel 后,程序可以随时退出,而不是等待另一个 goroutine,在这种情况下,它会循环接收来自 channel 的值。

关于go - 关闭未知长度的 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34283255/

相关文章:

concurrency - GPar 的数据并行性

选择语句 channel 示例

go - 为什么这些 goroutines 的 WaitGroups 不能正常工作?

go - 在没有锁的情况下并发读取函数指针是否安全?

c++ - 是否有使用 c++11 的并发原语的合适的 wait_any 实现?

go - 关闭 channel 与发送例如一个空结构?

sql - 如何在 gorm 中创建外键?

go - 寻找一种在golang mysql中查看内插查询的方法

mongodb - 如何使用struct使用mongo-go-driver的更新功能

Golang 的 Github Oauth token