我有一个用例,我需要锁定函数的参数。
函数本身可以并发访问
函数签名类似于
func (m objectType) operate(key string) (bool) {
// get lock on "key" (return false if unable to get lock in X ms - eg: 100 ms)
// operate
// release lock on "key"
return true;
}
可锁定的数据空间在百万级(~1000万)
对 opera() 的并发访问在数千个范围内 (1 - 5k)
尽管在 key 存在热点的情况下(因此锁定),但预期的争用较低
实现这个的正确方法是什么?我使用并发 HashMap 探索了几个选项
- sync.Map - 这适用于仅附加条目且读取比写入高的情况。因此不适用于此处
- 分片 HashMap ,其中每个分片均由 RWMutex 锁定 - https://github.com/orcaman/concurrent-map - 虽然这可行,但并发性受到分片数量的限制,而不是键之间的实际争用。当 key 子集发生大量争用时,也不会启用超时场景
虽然超时是 P1 要求,但 P0 要求是在可能的情况下通过粒度锁定来增加吞吐量。
有什么好的方法可以实现这一点吗?
最佳答案
我会通过使用缓冲 channel 的映射来做到这一点:
- 要获取“互斥体”,请尝试用值填充缓冲 channel
- 工作
- 完成后,清空缓冲 channel ,以便另一个 goroutine 可以使用它
示例:
package main
import (
"fmt"
"sync"
"time"
)
type MutexMap struct {
mut sync.RWMutex // handle concurrent access of chanMap
chanMap map[int](chan bool) // dynamic mutexes map
}
func NewMutextMap() *MutexMap {
var mut sync.RWMutex
return &MutexMap{
mut: mut,
chanMap: make(map[int](chan bool)),
}
}
// Acquire a lock, with timeout
func (mm *MutexMap) Lock(id int, timeout time.Duration) error {
// get global lock to read from map and get a channel
mm.mut.Lock()
if _, ok := mm.chanMap[id]; !ok {
mm.chanMap[id] = make(chan bool, 1)
}
ch := mm.chanMap[id]
mm.mut.Unlock()
// try to write to buffered channel, with timeout
select {
case ch <- true:
return nil
case <-time.After(timeout):
return fmt.Errorf("working on %v just timed out", id)
}
}
// release lock
func (mm *MutexMap) Release(id int) {
mm.mut.Lock()
ch := mm.chanMap[id]
mm.mut.Unlock()
<-ch
}
func work(id int, mm *MutexMap) {
// acquire lock with timeout
if err := mm.Lock(id, 100*time.Millisecond); err != nil {
fmt.Printf("ERROR: %s\n", err)
return
}
fmt.Printf("working on task %v\n", id)
// do some work...
time.Sleep(time.Second)
fmt.Printf("done working on %v\n", id)
// release lock
mm.Release(id)
}
func main() {
mm := NewMutextMap()
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
id := i % 10
go func(id int, mm *MutexMap, wg *sync.WaitGroup) {
work(id, mm)
defer wg.Done()
}(id, mm, &wg)
}
wg.Wait()
}
编辑:不同的版本,我们还处理对 chanMap 本身的并发访问
关于multithreading - 多个并发动态锁,获取锁失败则超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68827543/