我正在尝试构建的有界缓冲区类的属性...
- 多个生产者,多个消费者。
- 阻止生产者和阻止消费者。
- 使用 AtomicInteger 作为读/写指针。
- 使用 AtomicReferenceArray(采用通用类型)来保存缓冲区。
- 缓冲区的大小为 Short.MAX_VALUE,它使用 CAS 来处理溢出。
现在解决问题...
问题:我似乎无法注释掉下面代码中的synchronized(this) block 。我认为使用 AtomicInteger 作为指针的全部目的是避免这样做。
注释掉synchronized(this) block 会导致消费者丢失生产者放入的一些项目。如果我包含synchronized(this) block ,一切都很好,并且生成的每一个东西都会被消耗。
我错过了什么?
public class BoundedBuffer<T> {
private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
private AtomicReferenceArray<T> m_buffer = null;
private Semaphore m_full = new Semaphore(BUFFER_SIZE);
private Semaphore m_empty = new Semaphore(0);
private AtomicInteger m_writePointer = new AtomicInteger();
private AtomicInteger m_readPointer = new AtomicInteger();
public BoundedBuffer() {
m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
}
public static int safeGetAndIncrement(AtomicInteger i) {
int oldValue = 0, newValue = 0;
do {
oldValue = i.get();
newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
} while (!i.compareAndSet(oldValue, newValue));
return oldValue;
}
public void add(T data) throws InterruptedException {
m_full.acquire();
synchronized (this) { // << Commenting this doesn't work
// CAS-based overflow handling
m_buffer.set(safeGetAndIncrement(m_writePointer),data);
}
m_empty.release();
}
public T get() throws InterruptedException {
T data = null;
m_empty.acquire();
synchronized (this) { // << Commenting this doesn't work
// CAS-based overflow handling
data = m_buffer.get(safeGetAndIncrement(m_readPointer));
}
m_full.release();
return data;
}
}
最佳答案
可能存在一个问题,即删除同步块(synchronized block)时,数组中的 get() 与增量不是原子的。我推测的破坏场景要求生产者超越消费者,那么如果信号量释放是由乱序读取触发的,那么您可以让生产者覆盖尚未读取的数组条目。
考虑缓冲区已满的情况(写入器索引位于 N,读取器索引位于 N+1)并且 2 个线程正在尝试从缓冲区读取。 (为简单起见,假设 N 不接近环绕点。)
线程 1 接收索引 N+1,从中读取其项目。
线程 2 接收索引 N+2,从中读取其项目。
由于调度的偶然性,线程 2 首先从缓冲区数组获取并在线程 1 从数组获取其项目之前释放 m_full
信号量。
线程 3(生产者)醒来并将一个项目写入缓冲区中的下一个可用槽 N+1,也是在线程 1 从缓冲区读取数据之前。
线程 1 然后获取索引 N+1 处的项目,但错过了它想要的项目。
关于java - 这个 BoundedBuffer 类的问题出在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7375310/