java - 创建 SingleBlockingQueue 同步器

标签 java multithreading concurrency

我正在尝试创建一个 SingleBlockingQueue<T>同步器,允许一个线程 offer()一个元素,另一个线程将 take()它。只有一个T元素保存在 SingleBlockingQueue<T> 内一次,推送线程被阻塞在 offer()如果前一个元素正在等待获取线程 take()它。推送线程将继续推送项目,直到调用 setComplete() ,并且获取线程将继续调用take()isComplete()是假的。如果获取线程正在等待元素,则该线程将被阻塞。

这是我迄今为止得到的同步器。

import java.util.concurrent.atomic.AtomicBoolean;

public final class SingleBlockingQueue<T> {

    private volatile T value;
    private final AtomicBoolean isComplete = new AtomicBoolean(false);
    private final AtomicBoolean isPresent =  new AtomicBoolean(false);

    public void offer(T value) throws InterruptedException {
        while (isPresent.get()) {
            this.wait();
        }
        this.value = value;
        synchronized(this) {
            this.notifyAll();
        }
    }
    public boolean isComplete() {
        return !isPresent.get() && isComplete.get();
    }
    public void setComplete() {
        isComplete.set(true);
    }
    public T take() throws InterruptedException {
        while (!isPresent.get()) {
            this.wait();
        }
        T returnValue = value;
        isPresent.set(false);
        synchronized(this) {
            this.notifyAll();
        }
        return returnValue;
    }
}

这是 Kotlin 中的使用示例

    val queue = SingleBlockingQueue<Int>()

    thread {
        for (i in 1..1000) {
            queue.offer(i)
        }
        queue.setComplete()
    }

    thread {
        while (!queue.isComplete) {
            println(queue.take())
        }
    }

    Thread.sleep(100000)

但是,我遇到了一个错误,此时我有点不知所措。感谢 RxJava,我已经很长时间没有制作同步器了。我到底做错了什么?

Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29)
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33)
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8)
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18)

最佳答案

您不需要自己实现它,您可以使用SynchronousQueue

引用文献:

SynchronousQueue javadoc

http://tutorials.jenkov.com/java-util-concurrent/synchronousqueue.html

The SynchronousQueue class implements the BlockingQueue interface. Read the BlockingQueue text for more information about the interface.

The SynchronousQueue is a queue that can only contain a single element internally. A thread inseting an element into the queue is blocked until another thread takes that element from the queue. Likewise, if a thread tries to take an element and no element is currently present, that thread is blocked until a thread insert an element into the queue.

关于java - 创建 SingleBlockingQueue 同步器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37446814/

相关文章:

java - 当我将扫描仪对象分配给变量时程序崩溃

java - 更改背景颜色时 JFrame 闪烁

java - CyclicBarrier:导致屏障跳闸的 'x' 个线程中的 'y' 完成执行并终止

asp.net - 防止用户在同一行上工作

java - 比较 List<Map<String, String>> 和 List<Map<String, Object>> Java

java - 如何使用 Spring-mvc 修复 "Mail server connection failed; nested exception is javax.mail.MessagingException: Could not convert socket to TLS; "?

java - 不能使用 java.util.regex.Pattern 即使我在 eclipse 中使用 jre8

multithreading - 使用 CLR boost 线程

python - 如何将 joblib 并行化与不返回任何内容的类内方法一起使用

java - 当线程池大小小于执行的任务数时,使用 newFixedThreadPool 的多线程程序不会正常运行