concurrency - 为高并发应用程序实现全局计数器的最佳方法?

标签 concurrency go

为高并发应用程序实现全局计数器的最佳方法是什么?就我而言,我可能有 10K-20K 的例程执行“工作”,我想计算例程共同处理的项目的数量和类型......

“经典”同步编码风格如下所示:

var work_counter int

func GoWorkerRoutine() {
    for {
        // do work
        atomic.AddInt32(&work_counter,1)
    }    
}

现在这变得更复杂了,因为我想跟踪正在完成的工作的“类型”,所以我真的需要这样的东西:

var work_counter map[string]int
var work_mux sync.Mutex

func GoWorkerRoutine() {
    for {
        // do work
        work_mux.Lock()
        work_counter["type1"]++
        work_mux.Unlock()
    }    
}

似乎应该有一种使用 channel 或类似的“go”优化方式:

var work_counter int
var work_chan chan int // make() called somewhere else (buffered)

// started somewher else
func GoCounterRoutine() {
    for {
        select {
            case c := <- work_chan:
                work_counter += c
                break
        }
    }
}

func GoWorkerRoutine() {
    for {
        // do work
        work_chan <- 1
    }    
}

最后一个示例仍然缺少 map ,但添加起来很容易。这种风格会比简单的原子增量提供更好的性能吗?当我们谈论对全局值的并发访问与可能阻止 I/O 完成的事情时,我无法判断这是否或多或少复杂......

感谢您的想法。

2013 年 5 月 28 日更新:

我测试了几个实现,结果不是我预期的,这是我的计数器源代码:

package helpers

import (
)

type CounterIncrementStruct struct {
    bucket string
    value int
}

type CounterQueryStruct struct {
    bucket string
    channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
    counter = make(map[string]int)
    counterIncrementChan = make(chan CounterIncrementStruct,0)
    counterQueryChan = make(chan CounterQueryStruct,100)
    counterListChan = make(chan chan map[string]int,100)
    go goCounterWriter()
}

func goCounterWriter() {
    for {
        select {
            case ci := <- counterIncrementChan:
                if len(ci.bucket)==0 { return }
                counter[ci.bucket]+=ci.value
                break
            case cq := <- counterQueryChan:
                val,found:=counter[cq.bucket]
                if found {
                    cq.channel <- val
                } else {
                    cq.channel <- -1    
                }
                break
            case cl := <- counterListChan:
                nm := make(map[string]int)
                for k, v := range counter {
                    nm[k] = v
                }
                cl <- nm
                break
        }
    }
}

func CounterIncrement(bucket string, counter int) {
    if len(bucket)==0 || counter==0 { return }
    counterIncrementChan <- CounterIncrementStruct{bucket,counter}
}

func CounterQuery(bucket string) int {
    if len(bucket)==0 { return -1 }
    reply := make(chan int)
    counterQueryChan <- CounterQueryStruct{bucket,reply}
    return <- reply
}

func CounterList() map[string]int {
    reply := make(chan map[string]int)
    counterListChan <- reply
    return <- reply
}

它使用 channel 进行写入和读取,这似乎是合乎逻辑的。

这是我的测试用例:

func bcRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkChannels(b *testing.B) {
    b.StopTimer()
    CounterInitialize()
    e:=make(chan bool)
    b.StartTimer()

    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
    mux.Lock()
    m[bucket]+=value
    mux.Unlock()
}

func bmRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkMutex(b *testing.B) {
    b.StopTimer()
    m=make(map[string]int)
    e:=make(chan bool)
    b.StartTimer()

    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }

    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

我实现了一个简单的基准测试,只在 map 周围使用了一个互斥锁(只是测试写入),并使用 5 个并行运行的 goroutine 对两者进行了基准测试。结果如下:

$ go test --bench=. helpers
PASS
BenchmarkChannels         100000             15560 ns/op
BenchmarkMutex   1000000              2669 ns/op
ok      helpers 4.452s

我没想到互斥锁会这么快......

还有什么想法?

最佳答案

如果您尝试同步一组工作人员(例如,允许 n 个 goroutine 处理一些工作),那么 channel 是一种非常好的方法,但如果您真正需要的只是一个计数器(例如页面浏览量),那么它们就太过分了。 syncsync/atomic软件包可以提供帮助。

import "sync/atomic"

type count32 int32

func (c *count32) inc() int32 {
    return atomic.AddInt32((*int32)(c), 1)
}

func (c *count32) get() int32 {
    return atomic.LoadInt32((*int32)(c))
}

Go Playground Example

关于concurrency - 为高并发应用程序实现全局计数器的最佳方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16783273/

相关文章:

google-app-engine - AppEngine 的 datastore.Get() 是否不验证所请求 key 的命名空间?

java - 当线程更改状态时,有什么方法可以在进程中获取通知吗?

java - 在一段时间内阻塞对对象的方法调用

c - wait() 系统调用 - child 会忽略它吗?

go - 如何通过接口(interface)获取指针的值类型?

json - 是否可以有一个包含多个 JSON 标签的结构?

ios - 使用 NSOperationQueue,如何添加到后台队列而不是主队列,以及如何控制操作量?

java - 在事件调度线程中显示 JWindow

go - 在 Goroutine 中更新值

macos - 安全 : SecKeychainSearchCopyNext Error - Unable to Install Delve on MacOS