go - 扇入 channel 至单 channel

标签 go channel

我有多个 channel c1、c2、c3、c4 ...,如何将这些 channel 中的所有数据收集到一个 channel 中? 我的代码:

package main

import (
    "fmt"
    "sync"
)

func putToChannel(c chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i < 6; i++ {
        c <- i
    }
}

func main() {
    c := make(chan int, 15)
    c1 := make(chan int, 5)
    c2 := make(chan int, 5)
    c3 := make(chan int, 5)
    go func(){c <- <-c1}()
    go func(){c <- <-c2}()
    go func(){c <- <-c3}()
    wg := new(sync.WaitGroup)
    wg.Add(3)
    go putToChannel(c1, wg)
    go putToChannel(c2, wg)
    go putToChannel(c3, wg)
    wg.Wait()
    close(c)
    for i := range c {
        fmt.Println("Receive:", i)
    }

    fmt.Println("Finish")
}

我想将 c1、c2 ... 到 c 的所有数据组合在一起,但它不起作用

最佳答案

This article有一篇关于如何进行 channel “扇入”(包括短暂停止)的精彩文章。

这些行有问题:

go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()

其中每一个都将从 cx 接收一个值 channel 并将该值发送到 c .

您需要一个如下所示的方法;

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

此方法依赖于以下事实: channel cs...正在传递给 merge当没有更多值要发送时关闭。

这意味着您需要更新您的 putToChannel方法还

func putToChannel(c chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    defer close(c)
    for i := 1; i < 6; i++ {
        c <- i
    }
}

最后一件值得注意的事情是,一般来说;尝试将创建并发送到 channel 的函数和关闭 channel 的函数封装为同一个函数。这意味着您永远不会尝试在封闭的 channel 上发送。

而不是:

c1 := make(chan int, 5)
go putToChannel(c1, wg)

你可以做到;

func generator() (<-chan int) {
    c := make(chan int, 5)
    go func() {
        for i := 1; i < 6; i++ {
             c <- i
        }
        close(c)
    }() 
    return c
}

您的主要方法将类似于:

func main() {
    var cs []<-chan int

    cs = append(cs, generator())
    cs = append(cs, generator())
    cs = append(cs, generator())

    c := merge(cs...)
    for v := range c {
        fmt.Println(v)
    }
}

关于go - 扇入 channel 至单 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51238972/

相关文章:

java - 如何使用 AsynchronousServerSocketChannel 绑定(bind)多个端口?

go - 在顺序执行之前等待 channel 中的 N 个项目

go - Docker Machine - 通用驱动程序不支持启动

go - "safely escaped with Go syntax"是什么意思?

测试具有相同内容的 Go 中 map 的等价性,但测试失败

go - 检查我是否可以从 channel 读取

Golang 程序未完成执行就挂起

正则表达式:Grafana 变量将连字符分隔的 IP 转换为点分隔的 IP

go - Go 中令人尴尬的并行任务的惯用解决方案是什么?

go - channel 多路复用器