concurrency - 关闭 channel

标签 concurrency go channel

我已经根据以下示例创建了一个简单的 channel 来发出异步 HTTP 请求:

http://matt.aimonetti.net/posts/2012/11/27/real-life-concurrency-in-go/

一旦所有请求都完成,关闭 channel 的最佳模式是什么?

type HttpRequest struct {
    url        string
}

type HttpResponse struct {
    request  HttpRequest
    response *http.Response
    err      error
}

func asyncHttpGets(requests []HttpRequest) {
    ch := make(chan *HttpResponse)
    for _, request := range requests {
        go func(url string) {
            resp, err := http.Get(url)
            ch <- &HttpResponse{request, resp, err}
        }(request.url)
    }

    for {
        select {
        case r := <-ch:
            processResponse(r)
        }
    }
}

最佳答案

这样写的代码会产生死锁。但是, channel 不一定要关闭。有多种方法可以解决这个问题。

例如,您可以将 for/select 循环替换为:

n := len(requests)
for r := range ch {
    processResponse(r)
    n--
    if n == 0 {
        break
    }
}

这里我们假设潜在的超时是在每个 goroutine 中管理的。

另一个真正依赖于关闭 channel 的解决方案可以写成如下:

func asyncHttpGets(requests []HttpRequest) {

    ch := make(chan *HttpResponse)
    var wg sync.WaitGroup
    for _, request := range requests {
        wg.Add(1)
        go func(r HttpRequest) {
            defer wg.Done()
            resp, err := http.Get(r.url)
            ch <- &HttpResponse{r, resp, err}
        }(request)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()
    for r := range ch {
        processResponse(r)
    }
}

请注意,与初始代码相比,请求变量不是从 goroutine 中访问的,而是作为参数传递的。因此,通过 channel 发布的输出数据结构是一致的。这是初始代码中的一个问题。在以下位置查看有关此特定主题的更多信息:https://github.com/golang/go/wiki/CommonMistakes

另一种解决方案是使用原子计数器对 goroutine 中的响应进行计数,并在计数器达到限制时显式关闭 channel 。但是处理 sync/atomic 通常很容易出错,所以在这里可能不是一个好主意。

最后,有时您需要获得更多控制才能正确管理超时、错误等......tomb 包可以帮助您以安全的方式管理 goroutine 的生命周期。

参见 https://github.com/go-tomb/tomb/tree/v2

关于concurrency - 关闭 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27579190/

相关文章:

ruby-on-rails - Sidekiq - 无法在 5.000 秒内获得数据库连接

mongodb - 如何使用 mgo 将 int slice 传递给 "$in"

c++ - 使用 V4L2 API 选择输入 channel

java - 什么时候应该在 GRPC 上关闭 channel ?

go - 使用无缓冲 channel 的并发问题

java - 为什么这个 ReadWriteLock 示例不起作用?

concurrency - 幕后的 Erlang 进程是什么?

json - 在 Go 中获取内部 JSON 值

golang客户端在aerospike中对cdt列表进行操作

java - 为什么 ForkJoinPool 使用 ThreadPerTaskExecutor?