就我而言,我有数千个 goroutine 同时作为 work()
工作。 .我也有一个 sync()
协程。当sync
开始,我需要任何其他 goroutine 在同步作业完成后暂停一段时间。这是我的代码:
var channels []chan int
var channels_mutex sync.Mutex
func work() {
channel := make(chan int, 1)
channels_mutex.Lock()
channels = append(channels, channel)
channels_mutex.Unlock()
for {
for {
sync_stat := <- channel // blocked here
if sync_stat == 0 { // if sync complete
break
}
}
// Do some jobs
if (some condition) {
return
}
}
}
func sync() {
channels_mutex.Lock()
// do some sync
for int i := 0; i != len(channels); i++ {
channels[i] <- 0
}
channels_mutex.Unlock()
}
现在的问题是,因为 <-
读取时总是阻塞,每次都转到 sync_stat := <- channel
正在阻塞。我知道如果 channel 关闭它不会被阻止,但是因为我必须使用这个 channel 直到 work()
退出,但我没有找到任何方法可以重新打开已关闭的 channel 。
我怀疑自己走错路了,因此感谢您的帮助。是否有一些“优雅”的方式来暂停和恢复任何其他 goroutine?
最佳答案
如果我理解正确,您需要 N 个 worker 和一个 Controller ,可以随意暂停、恢复和停止 worker 。下面的代码就可以做到这一点。
package main
import (
"fmt"
"runtime"
"sync"
)
// Possible worker states.
const (
Stopped = 0
Paused = 1
Running = 2
)
// Maximum number of workers.
const WorkerCount = 1000
func main() {
// Launch workers.
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int, 1)
go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}
// Launch controller routine.
go func() {
controller(workers)
wg.Done()
}()
// Wait for all goroutines to finish.
wg.Wait()
}
func worker(id int, ws <-chan int) {
state := Paused // Begin in the paused state.
for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}
default:
// We use runtime.Gosched() to prevent a deadlock in this case.
// It will not be needed of work is performed here which yields
// to the scheduler.
runtime.Gosched()
if state == Paused {
break
}
// Do actual work here.
}
}
}
// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
// Start workers
setState(workers, Running)
// Pause workers.
setState(workers, Paused)
// Unpause workers.
setState(workers, Running)
// Shutdown workers.
setState(workers, Stopped)
}
// setState changes the state of all given workers.
func setState(workers []chan int, state int) {
for _, w := range workers {
w <- state
}
}
关于go - 是否有一些优雅的方式来暂停和恢复任何其他 goroutine?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16101409/