java - LinkedBlockingQueue put 和 take 功能实现

标签 java multithreading concurrency queue blockingqueue

我正在经历 LinkedBlockingQueue 的内部实现 put(E e) 和 take() 函数。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            **notEmpty.signal();**
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

  public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                **notFull.signal();**
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

这两种方法都没有明白为什么在与容量比较后调用条件下的 signal() 。 如果有人能解释一下,将不胜感激。

最佳答案

这是我可以考虑使用的一种情况:

if (c > 1)
    notEmpty.signal();

假设队列为空,有3个线程,thread_1、thread_2、thread_3。

  • thread_1 调用 take(),在 notEmpty.await() 处被阻塞。
  • thread_2 调用 take(),在 notEmpty.await() 处被阻塞。
  • thread_3 调用 take(),在 notEmpty.await() 处被阻塞。

然后,还有另外 3 个线程,thread_4、thread_5、thread_6。

  • thread_4 调用 put(),在队列中添加一个元素,并向 thread_1 发送信号
  • thread_1 唤醒,重新获取 takeLock,并尝试获取第一个元素。
  • 同时,thread_5 调用 put(),向队列中添加另一个元素,并向 thread_2 信号
  • thread_2 唤醒,尝试重新获取 takeLock,但 thread_1 现在持有该锁,因此必须等待。
  • thread_6 中断 thread_2,thread_2 抛出 InterruptedException 并终止。
  • thread_1 获取第一个元素。但是,该队列中还有另一个元素,因此出现了上面代码中的c > 1。如果thread_1不调用signal,thread_3就无法唤醒并获取第二个元素。
  • thread_1 调用 signal,thread_3 唤醒,并获取第二个元素。

关于java - LinkedBlockingQueue put 和 take 功能实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49940745/

相关文章:

java - click方法中的动态按钮和getTag

java - 错误在 tomcat 上部署期间找不到键 log4j.appender.error 的值

java - 线程内线程的优先级

Java 并发 : Run until terminated

java - 如何将数学答案四舍五入到小数点后 5 位

Java JTable 和 JToolBar 奇怪地调整大小

java - Java中的读/写锁实现

C++ boost 线程延迟

java - 如果线程花费太长时间,如何结束执行程序服务中的线程?

java - Observable|观察者模式的基本实现