func GoCountColumns(in chan []string, r chan Result, quit chan int) {
for {
select {
case data := <-in:
r <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
func main() {
fmt.Println("Welcome to the csv Calculator")
file_path := os.Args[1]
fd, _ := os.Open(file_path)
reader := csv.NewReader(bufio.NewReader(fd))
var totalColumnsCount int64 = 0
var totallettersCount int64 = 0
linesCount := 0
numWorkers := 10000
rc := make(chan Result, numWorkers)
in := make(chan []string, numWorkers)
quit := make(chan int)
t1 := time.Now()
for i := 0; i < numWorkers; i++ {
go GoCountColumns(in, rc, quit)
}
//start worksers
go func() {
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
if linesCount%1000000 == 0 {
fmt.Println("Adding to the channel")
}
in <- record
//data := countColumns(record)
linesCount++
//totalColumnsCount = totalColumnsCount + data.ColumnCount
//totallettersCount = totallettersCount + data.LettersCount
}
close(in)
}()
for i := 0; i < numWorkers; i++ {
quit <- 1 // quit goroutines from main
}
close(rc)
for i := 0; i < linesCount; i++ {
data := <-rc
totalColumnsCount = totalColumnsCount + data.ColumnCount
totallettersCount = totallettersCount + data.LettersCount
}
fmt.Printf("I counted %d lines\n", linesCount)
fmt.Printf("I counted %d columns\n", totalColumnsCount)
fmt.Printf("I counted %d letters\n", totallettersCount)
elapsed := time.Now().Sub(t1)
fmt.Printf("It took %f seconds\n", elapsed.Seconds())
}
My Hello World是读取csv文件并将其传递到频道的程序。然后,goroutines应该从该通道进行消耗。
我的问题是我不知道如何从主线程中检测到所有数据均已处理,并且可以退出程序。
最佳答案
在其他答案之上。
GoCountColumns
中,正在写入的r
通道中,关闭通道的责任在GoCountColumns
函数上。技术原因是,它是唯一知道该通道将不再被写入并因此可以安全关闭的演员。 func GoCountColumns(in chan []string, r chan Result, quit chan int) {
defer close(r) // this line.
for {
select {
case data := <-in:
r <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
GoCountColumns
最好编写为: func GoCountColumns(dst chan Result, src chan []string, quit chan int) {
defer close(dst)
for {
select {
case data := <-src:
dst <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
quit
。它不合逻辑。此quit
命令是强制退出序列,一旦检测到退出信号,就应调用该命令,以可能的最佳状态(可能已损坏)强制退出当前处理。换句话说,您应该依靠signal.Notify
包捕获退出事件,并通知您的工作人员退出。参见https://golang.org/pkg/os/signal/#example_Notify 为了编写更好的并行代码,请首先列出管理程序生命周期所需的例程,并确定需要阻止的例程以确保程序在退出之前已完成。
在您的代码中,存在
read
,map
。为了确保完整的处理,程序主要功能必须确保在map
退出时捕获自身退出之前的信号。请注意,read
函数无关紧要。然后,您还将需要从用户输入捕获退出事件所需的代码。
总体而言,看来我们需要阻止两个事件来管理生命周期。示意地,
func main(){
go read()
go map(mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
}
}
这个简单的代码对
process or die
很有帮助。确实,当捕获到用户事件时,程序将立即退出,而不会给其他例程提供机会在停止时执行所需的操作。为了改善这些行为,您首先需要一种方法来向程序发出要退出其他程序的信号,其次,需要一种等待这些程序完成其停止序列然后再离开的方法。
要发出退出事件或取消信号,您可以使用
context.Context
,将其传递给工作人员,让他们听。再次,示意地,
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
}
}
(更多内容供以后阅读和映射)
只要它们是线程安全的,就可以等待完成。通常,使用
sync.WaitGroup
。或者,在像您这样仅需要等待一个例程的情况下,我们可以重新使用当前的mapDone
通道。func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
<-mapDone
}
}
这很简单明了。但这并不完全正确。最后一个mapDone chan可能永远阻塞,并使程序无法停止。因此,您可以实现第二个信号处理程序或超时。
从原理上讲,超时解决方案是
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
select {
case <-mapDone:
case <-time.After(time.Second):
}
}
}
您可能还会在最后一次选择中累积信号处理和超时。
最后,关于
read
和map
上下文侦听的信息很少。从
map
开始,该实现需要定期读取context.Done
通道以检测cancellation
。这是简单的部分,它只需要更新select语句。
func GoCountColumns(ctx context.Context, dst chan Result, src chan []string) {
defer close(dst)
for {
select {
case <-ctx.Done():
<-time.After(time.Minute) // do something more useful.
return // quit. Notice the defer will be called.
case data := <-src:
dst <- countColumns(data) // some calculation function
}
}
}
现在,
read
部分更加棘手,因为它是一个IO,它不提供能够使用select
的编程接口,并且侦听上下文通道取消似乎是矛盾的。它是。由于IO受阻,因此无法监听上下文。从上下文通道读取时,无法读取IO。在您的情况下,该解决方案需要了解您的读取循环与程序寿命无关(回想一下,我们仅侦听mapDone吗?),并且我们可以忽略上下文。在其他情况下,例如,如果您想在读取的最后一个字节处重新启动(因此,在每次读取时,我们将增加n个字节,并在停止时保存该值)。然后,需要启动一个新的例程,因此,多个例程要等待完成。在这种情况下,
sync.WaitGroup
将更合适。示意地,
func main(){
var wg sync.WaitGroup
processDone:=make(chan struct{})
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
wg.Add(1)
go saveN(ctx,&wg)
wg.Add(1)
go map(ctx,&wg)
go signal()
go func(){
wg.Wait()
close(processDone)
}()
select {
case <-processDone:
case <-sig:
cancel()
select {
case <-processDone:
case <-time.After(time.Second):
}
}
}
在最后的代码中,正在传递等待组。例程负责调用
wg.Done()
,当所有例程完成后,processDone
通道关闭,以发出选择信号。 func GoCountColumns(ctx context.Context, dst chan Result, src chan []string, wg *sync.WaitGroup) {
defer wg.Done()
defer close(dst)
for {
select {
case <-ctx.Done():
<-time.After(time.Minute) // do something more useful.
return // quit. Notice the defer will be called.
case data := <-src:
dst <- countColumns(data) // some calculation function
}
}
}
尚不确定哪种模式是首选,但您可能还会看到
waitgroup
仅在 call 站点进行管理。func main(){
var wg sync.WaitGroup
processDone:=make(chan struct{})
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
wg.Add(1)
go func(){
defer wg.Done()
saveN(ctx)
}()
wg.Add(1)
go func(){
defer wg.Done()
map(ctx)
}()
go signal()
go func(){
wg.Wait()
close(processDone)
}()
select {
case <-processDone:
case <-sig:
cancel()
select {
case <-processDone:
case <-time.After(time.Second):
}
}
}
除了所有这些问题和OP问题,您必须始终预先评估并行处理给定任务的相关性。没有独特的配方,可以练习和评估您的代码性能。参见pprof。
关于go - 如何从主线程退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58898355/