java - 使用 ReentrantLock 实现阻塞并发

标签 java multithreading concurrency reentrantlock

我正在尝试实现一个类以通过使用 RentrantLock 阻止异步尝试修改实体的给定实例(由 key 标识)来在我的 Java 应用程序中强制执行并发。目标是阻止/排队多个并发尝试修改对象的给定实例,直到先前的线程完成。该类以通用方式实现这一点,允许任何代码块获得锁并在完成后释放它(与 RentrantLock 语义相同),并增加了仅阻塞线程试图修改对象的相同实例(如标识通过一个键)而不是阻止所有线程进入代码块。

此类提供了一个简单的构造,允许仅为实体的一个实例同步代码块。例如,如果我希望为来自 ID 为 33 的用户的所有线程同步一段代码,但我不希望来自任何其他用户的线程被为用户 33 提供服务的线程阻塞。

类实现如下

public class EntitySynchronizer {
  private static final int DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS = 300; // 5 minutes
  private Object mutex = new Object();
  private ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<Object, ReentrantLock>();
  private static ThreadLocal<Object> keyThreadLocal = new ThreadLocal<Object>();
  private int maximumLockDurationSeconds;
  public EntitySynchronizer() {
    this(DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS);
  }
  public EntitySynchronizer(int maximumLockDurationSeconds) {
    this.maximumLockDurationSeconds = maximumLockDurationSeconds;
  }
  /**
   * Initiate a lock for all threads with this key value
   * @param key the instance identifier for concurrency synchronization
   */
  public void lock(Object key) {
    if (key == null) {
      return;
    }
    /*
     * returns the existing lock for specified key, or null if there was no existing lock for the
     * key
     */
    ReentrantLock lock;
    synchronized (mutex) {
      lock = locks.putIfAbsent(key, new ReentrantLock(true));
      if (lock == null) {
        lock = locks.get(key);
      }
    }
    /*
     * Acquires the lock and returns immediately with the value true if it is not held by another
     * thread within the given waiting time and the current thread has not been interrupted. If this
     * lock has been set to use a fair ordering policy then an available lock will NOT be acquired
     * if any other threads are waiting for the lock. If the current thread already holds this lock
     * then the hold count is incremented by one and the method returns true. If the lock is held by
     * another thread then the current thread becomes disabled for thread scheduling purposes and
     * lies dormant until one of three things happens: - The lock is acquired by the current thread;
     * or - Some other thread interrupts the current thread; or - The specified waiting time elapses
     */
    try {
      /*
       * using tryLock(timeout) instead of lock() to prevent deadlock situation in case acquired
       * lock is not released normalRelease will be false if the lock was released because the
       * timeout expired
       */
      boolean normalRelease = lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS);
      /*
       * lock was release because timeout expired. We do not want to proceed, we should throw a
       * concurrency exception for waiting thread
       */
      if (!normalRelease) {
        throw new ConcurrentModificationException(
            "Entity synchronization concurrency lock timeout expired for item key: " + key);
      }
    } catch (InterruptedException e) {
      throw new IllegalStateException("Entity synchronization interrupted exception for item key: "
          + key);
    }
    keyThreadLocal.set(key);
  }
  /**
   * Unlock this thread's lock. This takes care of preserving the lock for any waiting threads with
   * the same entity key
   */
  public void unlock() {
    Object key = keyThreadLocal.get();
    keyThreadLocal.remove();
    if (key != null) {
      ReentrantLock lock = locks.get(key);
      if (lock != null) {
        try {
          synchronized (mutex) {
            if (!lock.hasQueuedThreads()) {
              locks.remove(key);
            }
          }
        } finally {
          lock.unlock();
        }
      } else {
        synchronized (mutex) {
          locks.remove(key);
        }
      }
    }
  }
}

这个类的用法如下:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer();
entitySynchronizer.lock(userId);  // 'user' is the entity by which i want to differentiate my synchronization
try {
  //execute my code here ...
} finally {
  entitySynchronizer.unlock();
}

问题是它不能完美地工作。在非常高的并发负载下,仍然存在一些情况,其中具有相同 key 的多个线程未被同步。我已经通过并进行了相当彻底的测试,无法弄清楚为什么/在哪里会发生这种情况。

那里有并发专家吗?

最佳答案

您应该解决的问题之一是:

ReentrantLock lock;
synchronized (mutex) {
  lock = locks.putIfAbsent(key, new ReentrantLock(true));
  if (lock == null) {
    lock = locks.get(key);
  }
}

这错过了并发映射的全部要点。你为什么不这样写:

ReentrantLock lock = new ReentrantLock(true);
final ReentrantLock oldLock = locks.putIfAbsent(key, lock);
lock = oldLock != null? oldLock : lock;

关于java - 使用 ReentrantLock 实现阻塞并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10427780/

相关文章:

multithreading - 如何创建可在 Rust 多线程服务器中使用的结构?

java - 从辅助线程在主线程上运行代码?

java - 从 Java 字符串中去除前导和尾随空格

java - Android 中如何知道线程是否正在运行

java - 创建和加入线程时副作用的可见性

c# - SignalAndWait用于锁定上下文

java - 仅在读取时是否必须使用线程安全的 Map 实现?

java - 使用 DocuSign AuthenticationApi.login() 进行旧版身份验证时出错 - 缺少 grant_type/code

java - 将内核模型传输到 Java 中的输出模型

java - OSGi 和 native 事件