go - worker 循环中使用多WG的正确方法是吗?

标签 go channel goroutine

在这种情况下,我想使用goroutine worker保存数据,然后等待保存数据完成,然后将新函数执行到操作数据,在这种情况下

var s struct {
  jobs chan Data
}

func allocateJob([] Data) {
    for _, d := range data {
        s.jobs <- d
    }
    close(s.jobs)
}

func Foo() (err error) {
 resultData = GetData()

 s.jobs = make(chan Data, NumOfWorkers)
 go allocateJob(resultData)

 var wg sync.WaitGroup
 for i := 1; i <= NumOfWorkers; i++ {
    wg.Add(1)
    go func() {
        for job := range jobs {
            err = s.saveData(ctx, job) // i want to wait thise till finish  save all data
            wg.Done()
            err = s.ManipulateDataSomething(ctx, job)
            wg.Done()
        }
        wg.Done()
    }()
 }
 wg.Wait()
 return err
}
有可能这样做和正确的方式吗?
我对并发性和goroutine非常陌生,希望我问的有道理

最佳答案

这是一个非常简单的示例:

package main

import (
    "fmt"
    "sync"
)

type job struct {
    do func()
}

func (j job) Do() {
    if j.do != nil {
        j.do()
    }
}

type workerPool struct {
    workers []worker
    stop chan struct{}
    jobs chan job
}

func newWorkerPool(numWorkers int) *workerPool {
    if numWorkers < 1 {
        numWorkers = 1
    }
    
    // stop denotes a channel to reclaim goroutine spawned by each workers.
    stop := make(chan struct{}, 1)
    
    // jobs denotes a job queue which able to queue at most 100 jobs.
    jobs := make(chan job, 100)
    
    // workers denotes a worker thread for concurrent processing jobs.
    workers := make([]worker, numWorkers)
    for i := range workers {
        workers[i] = worker {
            stop: stop,
            jobs: jobs,
        }
    }
    
    return &workerPool {
        workers: workers,
        stop: stop,
        jobs: jobs,
    }
}

// Start spawns multiple worker routines.
func (wp *workerPool) Start() {
    for i := range wp.workers {
        wp.workers[i].Start()
    }
}

// Stop reclaim goroutine spawned each worker.
func (wp *workerPool) Stop() {
    close(wp.stop)
}

// Do create a job and queue it to a job queue.
func (wp *workerPool) Do(fn func()) {
    wp.jobs <- job{do:fn}
}

type worker struct {
    stop  chan struct{}
    jobs  chan job
}

func (w *worker) Start() {
    go w.start()
}

func (w *worker) start() {
    for {
        select {
        case <-w.stop:
            return
        case job := <-w.jobs:
            job.Do()
        }
    }
}

func main() {

    // Create a worker pool with 4 workers inside.
    wp := newWorkerPool(4)
    
    // Start the workerpool to tell workers prepare to work.
    wp.Start()
    defer wp.Stop()
    
    // Using this wait group to wait until all of say hello jobs are processed.
    var helloWg sync.WaitGroup
    
    // Using this wait group to wait until all of say hi jobs are processed.
    var hiWg sync.WaitGroup
    
    // Define function of saying hello.
    sayHello := func() { 
        defer helloWg.Done()
        fmt.Println("Hello")
    }
    
    // Define function of saying hi.
    sayHi := func() {
        defer hiWg.Done()
        fmt.Println("Hi")
    }
    
    // Let's say hello 5 times.
    for i := 0 ; i < 5 ; i++ {
        helloWg.Add(1)
        wp.Do(sayHello)
    }
    
    // Let's say hi 3 times.
    go func() {
        for i := 0 ; i < 3 ; i++ {
            hiWg.Add(1)
            wp.Do(sayHi)
        }
    }()
    
    // Wait for all say hello jobs.
    helloWg.Wait()
    
    // Wait for all say hi jobs.
    hiWg.Wait()
}

playgound

关于go - worker 循环中使用多WG的正确方法是吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64061527/

相关文章:

multithreading - Goroutines - 为什么我只在最后看到并排执行

windows - GAE 转到 Windows - "Cannot run program", "is not a valid Win32 application"

go - Go 中的解码返回空白输出

go - 如何在没有隐式缓冲区的情况下将值从一个 Go channel 传输到另一个 channel ?

MySQL - 选择与表中日期最接近的记录并连接到主数据集

go - 如何使用 channel 在 go 例程之间传递 byte slice

linux - 将结构体实例重新声明为相同的变量名将返回 golang 中的旧对象

mysql - 使用 GORM 在 MySQL 中获取 NULL 日期时间值

go - 这台机器上最有效的 goroutines 数量

go - 当 main 退出时,goroutines 会运行 defer() 吗?