java - ReentrantLock 线程随机终止

标签 java multithreading reentrantlock extrinsic-parameters

我一直在做一项关于 Java 多线程的学校作业。我遇到的一个任务是,我们需要在不同的组中创建多个线程,并且一旦每个组中有4个线程,只有这样它们才能被释放以协同工作,否则它们必须被搁置/等待。例如:

  • 线程 a、b、c 加入组 7,它们都被搁置/等待。
  • 线程 d 加入组 7,所有四个线程(a、b、c、d)均收到终止信号。
  • 线程 e,f,g,h,i 加入组 8,在这种情况下,当线程 i 处于等待状态时,e,f,g,h 将收到终止信号。
  • 线程 j 加入组 7,处于等待状态。

这就是我完成的一般任务。我正在处理的任务要求我们释放一组中的初始前 4 个线程,其余的应该等到前面的 4 个线程调用 finish()。

例如,3 个线程加入组 65,它们被置于等待状态。另一个线程加入组 65,所有 4 个线程一起释放。现在有 4 个线程正在工作(已终止)。现在线程 e,f,g,h,i,j,k,l 加入组 65。所有线程都处于等待状态,直到 e,f,g,h 调用 finish() 方法。

这是我到目前为止所做的:

ExtrinsicSync.java:

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;

public class ExtrinsicSync {

    private HashMap<Integer, ConditionWrapper> groupThreadCount;
    private ReentrantLock monitor;
    private int count = 0;

    ExtrinsicSync() {
        groupThreadCount = new HashMap<>();
        monitor = new ReentrantLock();
    }

@Override
public void waitForThreadsInGroup(int groupId) {
    monitor.lock();

    if (!groupThreadCount.containsKey(groupId))
        groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));

    ConditionWrapper condWrapper = groupThreadCount.get(groupId);
    condWrapper.setValue(condWrapper.getValue() + 1);

    if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
    {
        condWrapper.getCondition().signalAll();
        condWrapper.setInitialStatus(false);

        System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
    } else {
        System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
        try { condWrapper.getCondition().await(); }
        catch (InterruptedException e) { e.printStackTrace(); }

    }

    monitor.unlock();
}

@Override
public void finished(int groupId) {
    monitor.lock();
    ConditionWrapper condWrapper = groupThreadCount.get(groupId);

    if(!condWrapper.getInitialStatus())
    {
        condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
        System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
        if(condWrapper.getFinishedCount() == 4)
        {
            condWrapper.setFinishedCount(0);
            condWrapper.getCondition().signalAll();
            System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
        }
    }
    monitor.unlock();
}

ExtrinsicSyncTest.java:

import org.junit.Test;

import java.util.EnumMap;

class TestTask1 implements Runnable{

    final int group;
    final ExtrinsicSync s1;

    TestTask1(int group, ExtrinsicSync s1)
    {
        this.group = group;
        this.s1 = s1;
    }

    public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}

public class ExtrinsicSyncTest {

    @Test
    public void testPhaseThreethreads() {

        int nThreads = 22;

        Thread t[] = new Thread[nThreads];
        final ExtrinsicSync s1 = new ExtrinsicSync();

        for(int i = 0; i < nThreads/2; i++)
            (t[i] = new Thread(new TestTask1(66, s1))).start();

        for(int i = nThreads/2; i < nThreads; i++)
            (t[i] = new Thread(new TestTask1(70, s1))).start();

        for (Thread ti : t)
        {
            try { ti.join(100); }
            catch (Exception e) { System.out.println(e); }
        }

        EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);

        for (Thread.State s : Thread.State.values())
            threadsInThisState.put(s, 0);

        for (Thread ti : t)
        {
            Thread.State state = ti.getState();
            int n = threadsInThisState.get(state);
            threadsInThisState.put(state, n + 1);
        }

        System.out.println("threadsInThisState: " + threadsInThisState.toString() );

    }
}

ConditionWrapper.java:

import java.util.concurrent.locks.Condition;

public class ConditionWrapper {
    private Condition cond;
    private Integer value;
    private Integer finishedCount;
    private boolean initialThreads;

    public ConditionWrapper(Condition condition)
    {
        this.cond = condition;
        this.value = 0;
        this.finishedCount = 0;
        this.initialThreads = true;
    }
    // Returns the condition object of current request
    public Condition getCondition()
    {
        return this.cond;
    }
    // Gets the current counter of threads waiting in this queue.
    public Integer getValue()
    {
        return this.value;
    }
    // Sets the given value. Used for resetting the counter.
    public void setValue(int value) { this.value = value; }
    // Sets the counter to help keep track of threads which called finished() method
    public void setFinishedCount(int count) { this.finishedCount = count; }
    // Gets the finished count.
    public Integer getFinishedCount() { return this.finishedCount; }
    // This flag is to identify initial threads of a group
    public boolean getInitialStatus() { return initialThreads; }
    public void setInitialStatus(boolean val) { this.initialThreads = val; }
}

我遇到的问题是,我能够释放每个组的前四个线程,但不知何故,某处有 2 个线程被随机终止,我无法弄清楚发生了什么。例如,上面的 22 线程测试用例分为两组,只有 8 个线程应该被终止,而其余线程则等待。

但是这里有 10 个线程被终止。我不明白发生了什么事。我已尽我所能将代码精简到最低限度。

最佳答案

问题是,对于非初始线程 (getInitialStatus==false),您不会向其他线程发出信号,但当达到其中四个线程时,您仍然会终止它们。所以这就是发生的事情:

  1. 前三个线程增加计数并等待
  2. 第四个线程达到 count == 4 并设置initial = false并向所有其他线程发出信号并将计数设置为零
  3. 接下来的三个线程将计数加一
  4. 8 个线程达到 count == 4 并终止。由于 getInitialStatus==false 该线程不会通知其他线程。

因此 4*2 线程 + 2 线程被终止。正是您在测试中看到的计数。


这是实现这一点的潜在方法:

  1. 在每个线程或任务中使用标志 canExecute
  2. 使用方法calculateState来计算当前状态,如果允许线程执行,则将标志设置为true。
  3. 将所有正在等待的线程存储在列表或类似的内容中

所以你的任务将如下所示:

Task
  boolean canExeute

waitForThreadsInGroup 方法如下所示:

waitForThreadsInGroup
  monitor.lock();
      add task to list
      calculateTaskState
      condition.notifyAll
      while( ! task.canExcecute )
      {
        condition.await.
      }

  monitor.unlock();

完成方法看起来类似:

  finish
    monitor.lock();
    decrement finish count
    calculateTaskState
   condition.notifyAll
   monitor.unlock();

并计算TaskState

calculateTaskState
  if( finishCount == 0)
  {
      if( taskList.size >= 4  )
      {
         set 4 tasks in this list to can execute and remove them from the list
      }
  }

所以诀窍是将逻辑分为三个步骤:

  1. 操作,例如减少完成次数
  2. 新状态的计算。并决定每个线程是否允许执行
  3. 以及线程的等待。每个线程都需要等待自己的标志

关于java - ReentrantLock 线程随机终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61054474/

相关文章:

java - 如果 ReentrantLock 已锁定,则等待但不锁定该锁

java - 可重入ReadWriteLock限制

java - 在方法中使用 for 循环检查字符串中的数组项 - 打印 12 行而不是 1 行

multithreading - Postgres 中的并发查询

java - 为什么我的 RxJava Flowable 在使用 observeOn 时不考虑背压?

Python,将线程与多处理一起使用

java - Lambda 表达式 Java 1.8 代码到 Java 1.6

java - 如何将字符串中的 double 据转换为在 android studio 上的 TextView 上设置

java - 如何在 Java 程序中读取 JavaScript 输出文本

java - 困惑的java ThreadPool和ReentrantLock