c# - 有效锁定资源,由字符串标识

标签 c# multithreading asynchronous .net-core locking

编辑:我更新了示例以使用 https://github.com/StephenCleary/AsyncEx图书馆。仍在等待可用的提示。

有资源,由字符串标识(例如文件、URL等)。我正在寻找资源的锁定机制。我找到了 2 种不同的解决方案,但每种都有其问题:

第一个是将 ConcurrentDictionary 类与 AsyncLock 一起使用:

using Nito.AsyncEx;
using System.Collections.Concurrent;

internal static class Locking {
    private static ConcurrentDictionary<string, AsyncLock> mutexes
        = new ConcurrentDictionary<string, AsyncLock>();

    internal static AsyncLock GetMutex(string resourceLocator) {
        return mutexes.GetOrAdd(
            resourceLocator,
            key => new AsyncLock()
        );
    }
}

异步用法:

using (await Locking.GetMutex("resource_string").LockAsync()) {
    ...
}

同步用法:

using (Locking.GetMutex("resource_string").Lock()) {
    ...
}

这可以安全地工作,但问题是字典变得越来越大,而且当没有人在等待锁时,我没有看到一种线程安全的方法来从字典中删除项目。 (我也想避免全局锁。)

我的第二个解决方案将字符串散列为 0N - 1 之间的数字,并锁定这些数字:

using Nito.AsyncEx;
using System.Collections.Concurrent;

internal static class Locking {
    private const UInt32 BUCKET_COUNT = 4096;

    private static ConcurrentDictionary<UInt32, AsyncLock> mutexes
        = new ConcurrentDictionary<UInt32, AsyncLock>();

    private static UInt32 HashStringToInt(string text) {
        return ((UInt32)text.GetHashCode()) % BUCKET_COUNT;
    }

    internal static AsyncLock GetMutex(string resourceLocator) {
        return mutexes.GetOrAdd(
            HashStringToInt(resourceLocator),
            key => new AsyncLock()
        );
    }
}

可以看出,第二种解决方案只是降低了碰撞的可能性,但并没有避免碰撞。我最担心的是它会导致死锁:避免死锁的主要策略是始终按特定顺序锁定项目。但是使用这种方法,不同的项目可以以不同的顺序映射到相同的桶,比如:(A->X,B->Y),(C->Y,D->X)。因此,使用此解决方案无法安全地锁定多个资源。

有更好的解决方案吗? (我也欢迎对上述 2 种解决方案提出批评。)

最佳答案

您可以通过在字典停止使用时从字典中删除锁来改进第一个解决方案。然后可以将删除的锁添加到一个小池中,这样下次您需要锁时,您只需从池中获取一个锁,而不是创建一个新锁。


更新:这是这个想法的一个实现。它基于 SemaphoreSlim 而不是 Stephen Cleary 的 AsyncLock,因为为了从字典中删除未使用的信号量,需要自定义一次性。

public class MultiLock<TKey>
{
    private object Locker { get; } = new object();
    private Dictionary<TKey, LockItem> Dictionary { get; }
    private Queue<LockItem> Pool { get; }
    private int PoolSize { get; }

    public MultiLock(int poolSize = 10)
    {
        Dictionary = new Dictionary<TKey, LockItem>();
        Pool = new Queue<LockItem>(poolSize);
        PoolSize = poolSize;
    }

    public WaitResult Wait(TKey key,
        int millisecondsTimeout = Timeout.Infinite,
        CancellationToken cancellationToken = default)
    {
        var lockItem = GetLockItem(key);
        bool acquired;
        try
        {
            acquired = lockItem.Semaphore.Wait(millisecondsTimeout,
                cancellationToken);
        }
        catch
        {
            ReleaseLockItem(lockItem, key);
            throw;
        }
        return new WaitResult(this, lockItem, key, acquired);
    }

    public async Task<WaitResult> WaitAsync(TKey key,
        int millisecondsTimeout = Timeout.Infinite,
        CancellationToken cancellationToken = default)
    {
        var lockItem = GetLockItem(key);
        bool acquired;
        try
        {
            acquired = await lockItem.Semaphore.WaitAsync(millisecondsTimeout,
                cancellationToken).ConfigureAwait(false);
        }
        catch
        {
            ReleaseLockItem(lockItem, key);
            throw;
        }
        return new WaitResult(this, lockItem, key, acquired);
    }

    private LockItem GetLockItem(TKey key)
    {
        LockItem lockItem;
        lock (Locker)
        {
            if (!Dictionary.TryGetValue(key, out lockItem))
            {
                if (Pool.Count > 0)
                {
                    lockItem = Pool.Dequeue();
                }
                else
                {
                    lockItem = new LockItem();
                }
                Dictionary.Add(key, lockItem);
            }
            lockItem.UsedCount += 1;
        }
        return lockItem;
    }

    private void ReleaseLockItem(LockItem lockItem, TKey key)
    {
        lock (Locker)
        {
            lockItem.UsedCount -= 1;
            if (lockItem.UsedCount == 0)
            {
                if (Dictionary.TryGetValue(key, out var stored))
                {
                    if (stored == lockItem) // Sanity check
                    {
                        Dictionary.Remove(key);
                        if (Pool.Count < PoolSize)
                        {
                            Pool.Enqueue(lockItem);
                        }
                    }
                }
            }
        }
    }

    internal class LockItem
    {
        public SemaphoreSlim Semaphore { get; } = new SemaphoreSlim(1);
        public int UsedCount { get; set; }
    }

    public struct WaitResult : IDisposable
    {
        private MultiLock<TKey> MultiLock { get; }
        private LockItem LockItem { get; }
        private TKey Key { get; }

        public bool LockAcquired { get; }

        internal WaitResult(MultiLock<TKey> multiLock, LockItem lockItem, TKey key,
            bool acquired)
        {
            MultiLock = multiLock;
            LockItem = lockItem;
            Key = key;
            LockAcquired = acquired;
        }

        void IDisposable.Dispose()
        {
            MultiLock.ReleaseLockItem(LockItem, Key);
            LockItem.Semaphore.Release();
        }
    }
}

使用示例:

var multiLock = new MultiLock<string>();
using (await multiLock.WaitAsync("SomeKey"))
{
    //...
}

未使用的信号量的默认池大小为 10。最佳值应该是使用 MultiLock 实例的并发工作人员的数量。

我在我的PC上做了一个性能测试,10个worker每秒总共可以异步获取锁500,000次(使用了20个不同的字符串标识符)。

关于c# - 有效锁定资源,由字符串标识,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60019617/

相关文章:

Java ExecutorService 无限循环作业

c++ - SDL - 如何在不停止代码执行的情况下在 C++ 中异步播放音频?

c# - 生成进程,但一次只有 5 个

c# - 如何增加 Oxyplot 中轴的绘图区域?

c# - 表达式树中的绑定(bind)参数

c# - WCF RIA 服务 - 返回两个已定义类的自定义类

java - 整数实例变量上的线程同步

c# - 数据库更新异常 : Which field is causing "String or binary data would be truncated"

java - 无法在多线程中实现同步

c# - 异步聊天服务器缓冲区问题