multithreading - 多个并发动态锁,获取锁失败则超时

标签 multithreading go locking concurrenthashmap

我有一个用例,我需要锁定函数的参数。

函数本身可以并发访问

函数签名类似于

func (m objectType) operate(key string) (bool) {
    // get lock on "key" (return false if unable to get lock in X ms - eg: 100 ms)
    // operate
    // release lock on "key"
    return true;
}

可锁定的数据空间在百万级(~1000万)

对 opera() 的并发访问在数千个范围内 (1 - 5k)

尽管在 key 存在热点的情况下(因此锁定),但预期的争用较低

实现这个的正确方法是什么?我使用并发 HashMap 探索了几个选项

  1. sync.Map - 这适用于仅附加条目且读取比写入高的情况。因此不适用于此处
  2. 分片 HashMap ,其中每个分片均由 RWMutex 锁定 - https://github.com/orcaman/concurrent-map - 虽然这可行,但并发性受到分片数量的限制,而不是键之间的实际争用。当 key 子集发生大量争用时,也不会启用超时场景

虽然超时是 P1 要求,但 P0 要求是在可能的情况下通过粒度锁定来增加吞吐量。

有什么好的方法可以实现这一点吗?

最佳答案

我会通过使用缓冲 channel 的映射来做到这一点:

  • 要获取“互斥体”,请尝试用值填充缓冲 channel
  • 工作
  • 完成后,清空缓冲 channel ,以便另一个 goroutine 可以使用它

示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

type MutexMap struct {
    mut     sync.RWMutex        // handle concurrent access of chanMap
    chanMap map[int](chan bool) // dynamic mutexes map
}

func NewMutextMap() *MutexMap {
    var mut sync.RWMutex
    return &MutexMap{
        mut:     mut,
        chanMap: make(map[int](chan bool)),
    }
}

// Acquire a lock, with timeout
func (mm *MutexMap) Lock(id int, timeout time.Duration) error {
    // get global lock to read from map and get a channel
    mm.mut.Lock()
    if _, ok := mm.chanMap[id]; !ok {
        mm.chanMap[id] = make(chan bool, 1)
    }
    ch := mm.chanMap[id]
    mm.mut.Unlock()

    // try to write to buffered channel, with timeout
    select {
    case ch <- true:
        return nil
    case <-time.After(timeout):
        return fmt.Errorf("working on %v just timed out", id)
    }
}

// release lock
func (mm *MutexMap) Release(id int) {
    mm.mut.Lock()
    ch := mm.chanMap[id]
    mm.mut.Unlock()
    <-ch
}

func work(id int, mm *MutexMap) {
    // acquire lock with timeout
    if err := mm.Lock(id, 100*time.Millisecond); err != nil {
        fmt.Printf("ERROR: %s\n", err)
        return
    }
    fmt.Printf("working on task %v\n", id)
    // do some work...
    time.Sleep(time.Second)
    fmt.Printf("done working on %v\n", id)

    // release lock
    mm.Release(id)
}

func main() {
    mm := NewMutextMap()
    var wg sync.WaitGroup
    for i := 0; i < 50; i++ {
        wg.Add(1)
        id := i % 10
        go func(id int, mm *MutexMap, wg *sync.WaitGroup) {
            work(id, mm)
            defer wg.Done()
        }(id, mm, &wg)
    }
    wg.Wait()
}

编辑:不同的版本,我们还处理对 chanMap 本身的并发访问

关于multithreading - 多个并发动态锁,获取锁失败则超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68827543/

相关文章:

multithreading - 具有快速元素删除和随机获取的线程安全可变集合

go - 可以在一个 FlatBuffers 缓冲区中编码多个表吗?

ios - 如何在任务切换器中显示锁定屏幕并正确恢复

c++ - 什么是无锁多线程编程?

java - 即使线程被声明为 null JAVA,线程仍继续运行

java - Android - 线程未执行

go - net.Listen 和 http.ListenAndServe 功能的区别

c - 使用 Go 1.5 buildmode=c-archive 和从 C 链接的 net/http.Server

c# - 事务内的 FOR UPDATE 不锁定行

python - 当打印位于外锁内时需要 Printer_lock