java - 为什么自定义阻塞队列在 Java 中不是线程安全的

标签 java multithreading concurrency

我只是想用ReentrantLock实现一个阻塞队列,我定义了两个条件fullempty,源码如下:

@Slf4j
@NotThreadSafe
public class CustomBlockQueue<T> {

    private ReentrantLock lock = new ReentrantLock();

    private Condition full = lock.newCondition();
    private Condition empty = lock.newCondition();

    private Integer maxLength = 1 << 4;

    private Integer putIndex = 0, takeIndex = 0;
    private Integer count = 0;

    private Object[] value;

    public BlockQueue(){
        value = new Object[maxLength];
    }

    public BlockQueue(Integer maxLength){
        this.maxLength = maxLength;
        value = new Object[maxLength];
    }

    public void put(T val) throws InterruptedException {
        lock.lock();
        try {
            if (count.equals(maxLength)){
                log.info("The queue is full!");
                full.await();
            }
            putIndex = putIndex % maxLength;
            value[putIndex++] = val;
            count++;
            empty.signal();
        }finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        Object val;
        try {
            if (count == 0){
                empty.await();
            }
            takeIndex = takeIndex % maxLength;
            val = value[takeIndex++];
            count--;
            full.signal();
        }finally {
           lock.unlock();
        }
        return (T) val;
    }
}

在两个消费者线程和一个提供者线程中测试时,count 在某些意外时间小于零。
为什么阻塞队列不是线程安全的,谁能帮帮我,给我一些指导?非常感谢你!

更新(2018/10/17)

如果我只使用一个Condition,它能正常运行吗?源码如下:

@Slf4j
@NotThreadSafe
public class CustomBlockQueue<T> {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    ...

    public void put(T val) throws InterruptedException {
        lock.lock();
        try {
            while (count.equals(maxLength)){
                log.info("The queue is full!");
                condition.await();
            }
            putIndex = putIndex % maxLength;
            value[putIndex++] = val;
            count++;
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        Object val;
        try {
            while (count == 0){
                condition.await();
            }
            takeIndex = takeIndex % maxLength;
            val = value[takeIndex++];
            count--;
            condition.signal();
        }finally {
           lock.unlock();
        }
        return (T) val;
    }
}

最佳答案

类似的问题:Why should wait() always be called inside a loop

解释

考虑这种情况:

  1. 消费者 1lock.lock(); 上被阻塞
  2. 消费者 2empty.await(); 上被阻止。
  3. producer 持有锁并向队列中添加一个元素,这使得 count = 1 并调用 empty.signal();
  4. consumer 2收到这个信号,从empty.await();中唤醒,需要重新获取锁,而cosumer 1 领先于它。
  5. cosumer 1 获得锁并发现计数为 1,因此它将计数减为 0。
  6. cosumer 2 获得锁,因为它已经执行了

    if (count == 0){    <--- consumer 2 will not re-check this condition
        empty.await();  
    }
    

    cosumer 2 认为队列不为空,然后执行:

    takeIndex = takeIndex % maxLength;
    val = value[takeIndex++];
    count--;
    

    这使得计数递减为 0。

解决方案

使用 while 而不是 if 保证消费者 2 将重新检查队列是否为空,这保证 count >= 0

while (count == 0){
    empty.await();
}

另外,最好用 produce 方法做同样的事情:

while (count.equals(maxLength)){
    log.info("The queue is full!");
    full.await();
}

关于java - 为什么自定义阻塞队列在 Java 中不是线程安全的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52832144/

相关文章:

javascript - javascript中的哈希表和 HashMap

Java 链接 HashMap : what's the difference in these two?

java - 将 thrift 与 PHP 和 Java 结合使用

java - 如何根据线程的名称或 ID 向线程发送通知?是否可以?

Python 共享队列 - 2 个不同的线程

Java ExecutionrCompletionService take.get() 从不同的线程

java - 有没有办法在 JPanel 上跟踪矩形(如第三人称)?

java - android java线程中的新ArrayAdapter

c++ - 如何在不终止程序的情况下停止 std::thread 的运行

java - RxJava方法从数据库读取多个条目并将它们写入多个文件