我正在尝试创建一个 SingleBlockingQueue<T>
同步器,允许一个线程 offer()
一个元素,另一个线程将 take()
它。只有一个T
元素保存在 SingleBlockingQueue<T>
内一次,推送线程被阻塞在 offer()
如果前一个元素正在等待获取线程 take()
它。推送线程将继续推送项目,直到调用 setComplete()
,并且获取线程将继续调用take()
而isComplete()
是假的。如果获取线程正在等待元素,则该线程将被阻塞。
这是我迄今为止得到的同步器。
import java.util.concurrent.atomic.AtomicBoolean;
public final class SingleBlockingQueue<T> {
private volatile T value;
private final AtomicBoolean isComplete = new AtomicBoolean(false);
private final AtomicBoolean isPresent = new AtomicBoolean(false);
public void offer(T value) throws InterruptedException {
while (isPresent.get()) {
this.wait();
}
this.value = value;
synchronized(this) {
this.notifyAll();
}
}
public boolean isComplete() {
return !isPresent.get() && isComplete.get();
}
public void setComplete() {
isComplete.set(true);
}
public T take() throws InterruptedException {
while (!isPresent.get()) {
this.wait();
}
T returnValue = value;
isPresent.set(false);
synchronized(this) {
this.notifyAll();
}
return returnValue;
}
}
这是 Kotlin 中的使用示例
val queue = SingleBlockingQueue<Int>()
thread {
for (i in 1..1000) {
queue.offer(i)
}
queue.setComplete()
}
thread {
while (!queue.isComplete) {
println(queue.take())
}
}
Thread.sleep(100000)
但是,我遇到了一个错误,此时我有点不知所措。感谢 RxJava,我已经很长时间没有制作同步器了。我到底做错了什么?
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29)
at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33)
at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18)
最佳答案
您不需要自己实现它,您可以使用SynchronousQueue
引用文献:
http://tutorials.jenkov.com/java-util-concurrent/synchronousqueue.html
The SynchronousQueue class implements the BlockingQueue interface. Read the BlockingQueue text for more information about the interface.
The SynchronousQueue is a queue that can only contain a single element internally. A thread inseting an element into the queue is blocked until another thread takes that element from the queue. Likewise, if a thread tries to take an element and no element is currently present, that thread is blocked until a thread insert an element into the queue.
关于java - 创建 SingleBlockingQueue 同步器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37446814/