go - 如何从主线程退出

标签 go

func GoCountColumns(in chan []string, r chan Result, quit chan int) {
    for {
        select {
        case data := <-in:
            r <- countColumns(data) // some calculation function
        case <-quit:
            return // stop goroutine
        }
    }

}

func main() {
    fmt.Println("Welcome to the csv Calculator")
    file_path := os.Args[1]
    fd, _ := os.Open(file_path)
    reader := csv.NewReader(bufio.NewReader(fd))
    var totalColumnsCount int64 = 0
    var totallettersCount int64 = 0
    linesCount := 0
    numWorkers := 10000
    rc := make(chan Result, numWorkers)
    in := make(chan []string, numWorkers)
    quit := make(chan int)
    t1 := time.Now()

    for i := 0; i < numWorkers; i++ {
        go GoCountColumns(in, rc, quit)
    }
    //start worksers
    go func() {
        for {
            record, err := reader.Read()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatal(err)
            }

            if linesCount%1000000 == 0 {
                fmt.Println("Adding to the channel")
            }
            in <- record
            //data := countColumns(record)
            linesCount++
            //totalColumnsCount = totalColumnsCount + data.ColumnCount
            //totallettersCount = totallettersCount + data.LettersCount
        }
        close(in)
    }()

    for i := 0; i < numWorkers; i++ {
        quit <- 1 // quit goroutines from main
    }
    close(rc)
    for i := 0; i < linesCount; i++ {
        data := <-rc
        totalColumnsCount = totalColumnsCount + data.ColumnCount
        totallettersCount = totallettersCount + data.LettersCount
    }

    fmt.Printf("I counted %d lines\n", linesCount)
    fmt.Printf("I counted %d columns\n", totalColumnsCount)
    fmt.Printf("I counted %d letters\n", totallettersCount)
    elapsed := time.Now().Sub(t1)
    fmt.Printf("It took %f seconds\n", elapsed.Seconds())
}

My Hello World是读取csv文件并将其传递到频道的程序。然后,goroutines应该从该通道进行消耗。
我的问题是我不知道如何从主线程中检测到所有数据均已处理,并且可以退出程序。

最佳答案

在其他答案之上。

  • 请务必(注意)关闭通道应在写调用站点而不是读调用站点上进行。在GoCountColumns中,正在写入的r通道中,关闭通道的责任在GoCountColumns函数上。技术原因是,它是唯一知道该通道将不再被写入并因此可以安全关闭的演员。

  •     func GoCountColumns(in chan []string, r chan Result, quit chan int) {
            defer close(r)     // this line.
            for {
                select {
                case data := <-in:
                    r <- countColumns(data) // some calculation function
                case <-quit:
                    return // stop goroutine
                }
            }
        }
    
  • 如果我可以说,函数参数的命名约定是将目标作为第一个参数,将源作为第二个参数,以及其他参数。 GoCountColumns最好编写为:

  •     func GoCountColumns(dst chan Result, src chan []string, quit chan int) {
            defer close(dst)
            for {
                select {
                case data := <-src:
                    dst <- countColumns(data) // some calculation function
                case <-quit:
                    return // stop goroutine
                }
            }
        }
    
  • 您在进程开始后立即调用quit。它不合逻辑。此quit命令是强制退出序列,一旦检测到退出信号,就应调用该命令,以可能的最佳状态(可能已损坏)强制退出当前处理。换句话说,您应该依靠signal.Notify包捕获退出事件,并通知您的工作人员退出。参见https://golang.org/pkg/os/signal/#example_Notify

  • 为了编写更好的并行代码,请首先列出管理程序生命周期所需的例程,并确定需要阻止的例程以确保程序在退出之前已完成。

    在您的代码中,存在readmap。为了确保完整的处理,程序主要功能必须确保在map退出时捕获自身退出之前的信号。请注意,read函数无关紧要。

    然后,您还将需要从用户输入捕获退出事件所需的代码。

    总体而言,看来我们需要阻止两个事件来管理生命周期。示意地,

    func main(){
        go read()
        go map(mapDone)
        go signal()
        select {
            case <-mapDone:
            case <-sig:
        }
    }
    

    这个简单的代码对process or die很有帮助。确实,当捕获到用户事件时,程序将立即退出,而不会给其他例程提供机会在停止时执行所需的操作。

    为了改善这些行为,您首先需要一种方法来向程序发出要退出其他程序的信号,其次,需要一种等待这些程序完成其停止序列然后再离开的方法。

    要发出退出事件或取消信号,您可以使用context.Context,将其传递给工作人员,让他们听。

    再次,示意地,

    func main(){
        ctx,cancel := context.WithCancel(context.WithBackground())
        go read(ctx)
        go map(ctx,mapDone)
        go signal()
        select {
            case <-mapDone:
            case <-sig:
                cancel()
        }
    }
    

    (更多内容供以后阅读和映射)

    只要它们是线程安全的,就可以等待完成。通常,使用sync.WaitGroup。或者,在像您这样仅需要等待一个例程的情况下,我们可以重新使用当前的mapDone通道。

    func main(){
        ctx,cancel := context.WithCancel(context.WithBackground())
        go read(ctx)
        go map(ctx,mapDone)
        go signal()
        select {
            case <-mapDone:
            case <-sig:
                cancel()
                <-mapDone
        }
    }
    

    这很简单明了。但这并不完全正确。最后一个mapDone chan可能永远阻塞,并使程序无法停止。因此,您可以实现第二个信号处理程序或超时。

    从原理上讲,超时解决方案是

    func main(){
        ctx,cancel := context.WithCancel(context.WithBackground())
        go read(ctx)
        go map(ctx,mapDone)
        go signal()
        select {
            case <-mapDone:
            case <-sig:
                cancel()
                select {
                    case <-mapDone:
                    case <-time.After(time.Second):
                }
        }
    }
    

    您可能还会在最后一次选择中累积信号处理和超时。

    最后,关于readmap上下文侦听的信息很少。

    map开始,该实现需要定期读取context.Done通道以检测cancellation

    这是简单的部分,它只需要更新select语句。

        func GoCountColumns(ctx context.Context, dst chan Result, src chan []string) {
            defer close(dst)
            for {
                select {
                case <-ctx.Done():
                    <-time.After(time.Minute) // do something more useful.
                    return // quit. Notice the defer will be called.
                case data := <-src:
                    dst <- countColumns(data) // some calculation function
                }
            }
        }
    

    现在,read部分更加棘手,因为它是一个IO,它不提供能够使用select的编程接口,并且侦听上下文通道取消似乎是矛盾的。它是。由于IO受阻,因此无法监听上下文。从上下文通道读取时,无法读取IO。在您的情况下,该解决方案需要了解您的读取循环与程序寿命无关(回想一下,我们仅侦听mapDone吗?),并且我们可以忽略上下文。

    在其他情况下,例如,如果您想在读取的最后一个字节处重新启动(因此,在每次读取时,我们将增加n个字节,并在停止时保存该值)。然后,需要启动一个新的例程,因此,多个例程要等待完成。在这种情况下,sync.WaitGroup将更合适。

    示意地,

    func main(){
        var wg sync.WaitGroup
        processDone:=make(chan struct{})
        ctx,cancel := context.WithCancel(context.WithBackground())
        go read(ctx)
        wg.Add(1)
        go saveN(ctx,&wg)
        wg.Add(1)
        go map(ctx,&wg)
        go signal()
        go func(){
            wg.Wait()
            close(processDone)
        }()
        select {
            case <-processDone:
            case <-sig:
                cancel()
                select {
                    case <-processDone:
                    case <-time.After(time.Second):
                }
        }
    }
    

    在最后的代码中,正在传递等待组。例程负责调用wg.Done(),当所有例程完成后,processDone通道关闭,以发出选择信号。

        func GoCountColumns(ctx context.Context, dst chan Result, src chan []string, wg *sync.WaitGroup) {
            defer wg.Done()
            defer close(dst)
            for {
                select {
                case <-ctx.Done():
                    <-time.After(time.Minute) // do something more useful.
                    return // quit. Notice the defer will be called.
                case data := <-src:
                    dst <- countColumns(data) // some calculation function
                }
            }
        }
    

    尚不确定哪种模式是首选,但您可能还会看到waitgroup仅在 call 站点进行管理。

    func main(){
        var wg sync.WaitGroup
        processDone:=make(chan struct{})
        ctx,cancel := context.WithCancel(context.WithBackground())
        go read(ctx)
        wg.Add(1)
        go func(){
            defer wg.Done()
            saveN(ctx)
        }()
        wg.Add(1)
        go func(){
            defer wg.Done()
            map(ctx)
        }()
        go signal()
        go func(){
            wg.Wait()
            close(processDone)
        }()
        select {
            case <-processDone:
            case <-sig:
                cancel()
                select {
                    case <-processDone:
                    case <-time.After(time.Second):
                }
        }
    }
    

    除了所有这些问题和OP问题,您必须始终预先评估并行处理给定任务的相关性。没有独特的配方,可以练习和评估您的代码性能。参见pprof。

    关于go - 如何从主线程退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58898355/

    相关文章:

    git - GoLang 调试控制台应用程序

    c++ - 将 C++ 片段转换为 Golang

    go - 使用 Go-Colly 抓取时删除空行

    go - VS Code 如何在滚动智能感知建议时启用文档?

    docker - 我在为 golang api 构建 docker 时遇到问题

    go - 多个 goroutine 中的 Scanf 给出了意想不到的结果

    go - 在 Go 中如何从溢出的 int 转换中得到错误?

    go - 是否有更简洁的方法来创建在 channel 上接收后取消的上下文?

    string - 如何在 Golang 中将映射解压缩为字符串格式的关键字参数?

    go - 使用 golang/protobuf 时在数据存储中构建错误