我需要一些帮助来理解如何在这个问题中使用 goroutines。我只会发布一些代码片段,但如果您想深入了解,可以查看 here
基本上,我有一个分发函数,它接收一个被多次调用的请求 slice ,每次调用该函数时,它都必须将此请求分发给其他函数以实际解析该请求。我正在尝试创建一个 channel 并启动此函数来解决新 goroutine 上的请求,因此该程序可以同时处理请求。
分发函数的调用方式:
// Run trigger the system to start receiving requests
func Run() {
// Since the programs starts here, let's make a channel to receive requests
requestCh := make(chan []string)
idCh := make(chan string)
// If you want to play with us you need to register your Sender here
go publisher.Sender(requestCh)
go makeID(idCh)
// Our request pool
for request := range requestCh {
// add ID
request = append(request, <-idCh)
// distribute
distributor(request)
}
// PROBLEM
for result := range resultCh {
fmt.Println(result)
}
}
分发函数本身:
// Distribute requests to respective channels.
// No waiting in line. Everybody gets its own goroutine!
func distributor(request []string) {
switch request[0] {
case "sum":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "sub":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "mult":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "div":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "fibonacci":
fibCh := make(chan []string)
go fibonacci.Exec(fibCh, resultCh)
fibCh <- request
case "reverse":
revCh := make(chan []string)
go reverse.Exec(revCh, resultCh)
revCh <- request
case "encode":
encCh := make(chan []string)
go encode.Exec(encCh, resultCh)
encCh <- request
}
}
fibonacci.Exec 函数说明了我如何在给定在 fibCh 上收到的请求并通过 resultCh 发送结果值的情况下尝试计算 Fibonacci。
func Exec(fibCh chan []string, result chan map[string]string) {
fib := parse(<-fibCh)
nthFibonacci(fib)
result <- fib
}
到目前为止,在 Run 函数中,当我在 resultCh 范围内时,我得到了结果,但也出现了死锁。但为什么?另外,我想我应该使用 waitGroup 函数来等待 goroutines 完成,但我不确定如何实现它,因为我希望收到连续的请求流。如果能帮助我理解我在这里做错了什么以及解决问题的方法,我将不胜感激。
最佳答案
我没有深入研究您的应用程序的实现细节,但在我看来,您基本上可以使用 workers
模式。
使用 workers
模式,多个 goroutine 可以从单个 channel 读取数据,在 CPU 内核之间分配一定数量的工作,workers 名称由此而来。在 Go 中,这种模式很容易实现 - 只需启动一些以 channel 作为参数的 goroutine,然后将值发送到该 channel - 分发和多路复用将由 Go 运行时自动完成。
这是 worker 模式的一个简单实现:
package main
import (
"fmt"
"sync"
"time"
)
func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasksCh
if !ok {
return
}
d := time.Duration(task) * time.Millisecond
time.Sleep(d)
fmt.Println("processing task", task)
}
}
func pool(wg *sync.WaitGroup, workers, tasks int) {
tasksCh := make(chan int)
for i := 0; i < workers; i++ {
go worker(tasksCh, wg)
}
for i := 0; i < tasks; i++ {
tasksCh <- i
}
close(tasksCh)
}
func main() {
var wg sync.WaitGroup
wg.Add(36)
go pool(&wg, 36, 50)
wg.Wait()
}
另一个有用的资源是如何使用 WaitGroup
等待所有 goroutines 在继续之前完成执行(因此不会陷入死锁)是这篇不错的文章:
以及它的一个非常基本的实现:
如果您不想更改实现以使用 worker
模式,使用另一个 channel 来表示 goroutine 执行结束可能是个好主意,因为死锁发生在没有接收者的情况下通过无缓冲 channel 接受发送的消息。
done := make(chan bool)
//.....
done <- true //Tell the main function everything is done.
因此,当您收到消息时,您可以通过将 channel 值设置为 true 来将执行标记为已完成。
关于Goroutine实现疑惑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35474949/