java - 如何同步操作?

标签 java multithreading locking deadlock synchronized

我有这个两阶段锁定的实现。问题是它在大多数情况下都能完美工作,但并非在所有情况下都可以。我发现问题是由于我使用 Synchronized 造成的,而 Synchronized 的使用方式是错误的!我需要的操作顺序如下:

  1. 解锁任何锁的线程应该在任何其他线程开始锁定同一锁之前完全完成此阶段(ul 方法)。
  2. 唤醒时等待锁的线程应该在开始执行其操作之前获取该锁的所有等待者。
  3. 任何 addEdge 或 removeEdge 都需要同步,因为我在所有线程中使用相同的图形对象。

这是代码:

class LockTable{
     private HashMap<Integer,MyLock> locks;
     public LockTable(){
         locks= new HashMap<Integer,MyLock>();   
    }

/*******************************************************************************
 * Acquire a shared/read lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void rl (int tid, int oid) throws InterruptedException
{

    MyLock lock=null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock

            if((lock != null) && (lock.lock.isWriteLocked())){

               wait = true;
            }
            if(lock == null){
             getIt = true;
             lock = new MyLock(tid, true);
             lock.lock.readLock().lock();
             lock.readers.add(tid);
             locks.put(oid, lock);
          }

        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
    }//synch
    if (wait){
        synchronized(this){
            //System.out.println("Transaction " + tid + " is waiting..");
            lock.waiters.add(tid);
            lock.waitersType.add('s');
            Main.g.addEdge(tid, lock.tid);
            //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
        }//sync
         if(Main.g.hasCycle()){
            //System.out.println("Detect Cycle in rl..Executing Restart");
            restart(tid);
        }
        //to exclude the restarted thread
        if(!Main.trans[tid].terminate){
            lock.lock.readLock().lock();
            lock.readers.add(tid);
          synchronized(this){
            lock.waitersType.remove(lock.waiters.indexOf(tid));
            lock.waiters.remove(lock.waiters.indexOf(tid));
            }//sync
            for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
            }//for
        }//if terminate
        else
            return;
    }
   else
    if(!getIt){
              //System.out.println("Getting Shared Lock without Waiting");
              lock.lock.readLock().lock();
              lock.readers.add(tid);
              for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
              } 
    }

} // rl

/*******************************************************************************
 * Acquire an exclusive/write lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void wl (int tid, int oid) throws InterruptedException
{
     //type to determine the last lock type in order
     //to be able to remove the edges from waitfor graph
    int type = 0;
    MyLock lock = null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock
            if(lock != null && (lock.lock.isWriteLocked() || lock.readers.size() > 0))
            {
                wait = true;
            }
            if(lock == null){
                getIt = true;
                lock = new MyLock(tid);
                lock.lock.writeLock().lock();
                locks.put(oid,lock);
            }
        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
  }
     if (wait){
            //System.out.println("Transaction " + tid + " is waiting..");
           synchronized(this) {
            if(lock.lock.isWriteLocked()){
                Main.g.addEdge(tid, lock.tid);
                //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
               }
            else{
                type = 1;
                for(int reader : lock.readers){
                     Main.g.addEdge(tid, reader);
                     //System.out.println("Edge has been added "+tid + "==>" + reader);
                }
                }//else
            if(Main.g.hasCycle()){
               //System.out.println("Detect Cycle");
               restart(tid);
             }//if
            //System.out.println("Graph: "+Main.g.toString());
         }//sync



       if(!Main.trans[tid].terminate){
            synchronized(this){
            lock.waiters.add(tid);
            lock.waitersType.add('w');
           }
            //System.out.println("I'm waiting here in wl");
            lock.lock.writeLock().lock();
            lock.tid = tid;
            //System.out.println("Wakeup..");
            synchronized(this){
                lock.waitersType.remove(lock.waiters.indexOf(tid));
                lock.waiters.remove(lock.waiters.indexOf(tid));
                //System.out.println("the number of waiters after wakeup: " + lock.waiters.size());
           }//sync
                for(int i =0 ; i < lock.waiters.size();i++){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                   // System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    //System.out.println("Graph: "+Main.g.toString());
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//for
         }
        else
            return; 
    }// if(wait) ==> for the lock to be released
    else
        if(!getIt){
            lock.lock.writeLock().lock();
            lock.tid = tid;
            for(int i =0 ; i < lock.waiters.size();i++){
                synchronized(Main.g){
                Main.g.addEdge(lock.waiters.get(i), tid);
                //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                }
                if(Main.g.hasCycle())
                    restart(lock.waiters.get(i));
            }//for

     }

} // wl

void restart(int tid){
 synchronized(this) {
    Main.rollBack++;
    MyLock lock;
    List<Integer> toRemove = new ArrayList();
    for(int i : locks.keySet()){
       lock = locks.get(i);
       //for all the locks in the lock table delete the restarted thread from the waiters list
       if(lock.waiters.contains(tid)){
           lock.waitersType.remove(lock.waiters.indexOf(tid));
           lock.waiters.remove(lock.waiters.indexOf(tid));
        }

           //lock.sem.release();
           if(lock.lock.isWriteLockedByCurrentThread()){


               //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), lock.tid);
               //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
               lock.lock.writeLock().unlock();
                //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());
               toRemove.add(i);

           }
       if(!lock.lock.isWriteLocked())
           if(lock.readers.contains(tid) && lock.lock.getReadLockCount()>0){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), tid);
                   // lock.numberOfReaders --;
                  //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
                  lock.readers.remove(lock.readers.indexOf(tid));
                  lock.lock.readLock().unlock();
                  //System.out.println("number of write holders: " + lock.lock.getWriteHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
                  toRemove.add(i);  

              }//if
    }//for
    Main.g.removeEdges(tid);
    Main.trans[tid].terminate = true;
    //System.out.println("Transaction" + tid + " restarted");

    }//sync
}

/*******************************************************************************
 * Unlock/release the lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void ul (int tid, int oid)
{
   MyLock lock = null;
    boolean error = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);                    // find the lock
            if( lock == null)
                System.out.println("println: lock not found");
        } catch(Exception e) {
            System.out.println("lock not found");   // lock not found
        } // try
    }
        if((lock != null) && (lock.lock.isWriteLockedByCurrentThread())){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), lock.tid);
                      }//sync
                   //System.out.println("tid: " + tid + " unlock object: " + oid);
                   lock.lock.writeLock().unlock();
                  //print out 
                  //System.out.println("done with unlock");
                  //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
      }// if lock != null
        else
          if((lock != null) && (lock.readers.size()>0)){
              synchronized(this){
              if(lock.readers.contains(tid)){
                lock.readers.remove(lock.readers.indexOf(tid));
                }
                //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), tid);
                      }
                lock.lock.readLock().unlock();
                //System.out.println("Transaction"+tid+" unlocked shared lock on object "+oid);
                //System.out.println("number of write holders: " + lock.lock.readLock().);
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());

              }//if lock.readers
          }//if
    if (error) 
        System.out.println ("Error: ul: no lock for oid = " + oid + " found/owned");
} // ul

最佳答案

使用 synchronized 时存在一些问题。

  1. 除了极少数情况外,对共享对象的所有读/写(尤其是写)访问都必须在同一对象上同步。当您在某些地方写入 Main.g 时,您正在同步它,但也有一些地方不同步(例如,在 wl 方法的 if(wait) 内部)。

  2. 据我了解您代码的目标,lock.{waiters|readers} 的状态必须与图表的状态 (Main.g)同步。这意味着每当您对 waiters 进行更改,然后相应地更新 Main.g 时,这两个操作都必须以原子方式执行。为此,您需要将它们包装在一个完整的 synchronized 语句中。我可以看到你理解这个概念,因为你在某些地方这样做了,但你似乎在其他地方错过了它。例如:在 rl 方法中,在 if(!getIt){ 内更新 lock.readers,然后更新 Main.g,无需任何同步。

总的来说,我无法给你任何具体的信息,因为代码非常复杂,所以很难说出它的意图是什么。但我认为您可以通过锁定代码的较大部分来解决一些锁定问题。例如,您可以通过在方法声明中添加 synchronized 关键字来同步 this 上的整个方法 rlwlul,并摆脱方法内的所有细粒度锁定。细粒度锁定可以为您提供更好的性能,但代价是高复杂性。我建议您首先从更简单的锁定方案开始,然后逐渐改进它们(如果您确实需要的话!)。

关于java - 如何同步操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5611511/

相关文章:

java - SQL数据库查询语法错误: "Encountered YEAR at line 1"

c++ - Boost Interprocess named_mutex 作为类成员

c - 当按值传递静态变量时会导致 c 中的竞争条件

linux - linux内核模块的读/写锁

java - 如何使用 setVisible(false) 从 JFrame 创建图像?

java - Window Batch 文件集成到 Java GUI 中

java - 尝试将查询结果集放入 map 时抛出意外的 NPE

c++ - asio::io_service 工作立即结束

c# - 从多个工作线程 (.NET) 更新 UI

python - 终止长时间运行的 python 线程