java - 这个 BoundedBuffer 类的问题出在哪里?

标签 java concurrency

我正在尝试构建的有界缓冲区类的属性...

  • 多个生产者,多个消费者。
  • 阻止生产者和阻止消费者。
  • 使用 AtomicInteger 作为读/写指针。
  • 使用 AtomicReferenceArray(采用通用类型)来保存缓冲区。
  • 缓冲区的大小为 Short.MAX_VALUE,它使用 CAS 来处理溢出。

现在解决问题...

问题:我似乎无法注释掉下面代码中的synchronized(this) block 。我认为使用 AtomicInteger 作为指针的全部目的是避免这样做。

注释掉synchronized(this) block 会导致消费者丢失生产者放入的一些项目。如果我包含synchronized(this) block ,一切都很好,并且生成的每一个东西都会被消耗。

我错过了什么?

public class BoundedBuffer<T> {
    private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
    private AtomicReferenceArray<T> m_buffer = null;
    private Semaphore m_full = new Semaphore(BUFFER_SIZE);
    private Semaphore m_empty = new Semaphore(0);
    private AtomicInteger m_writePointer = new AtomicInteger();
    private AtomicInteger m_readPointer = new AtomicInteger();

    public BoundedBuffer() {
        m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
    }

    public static int safeGetAndIncrement(AtomicInteger i) {
        int oldValue = 0, newValue = 0;
        do {
            oldValue = i.get();
            newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
        } while (!i.compareAndSet(oldValue, newValue));
        return oldValue;
    }

    public void add(T data) throws InterruptedException {
        m_full.acquire();
        synchronized (this) { // << Commenting this doesn't work
            // CAS-based overflow handling
            m_buffer.set(safeGetAndIncrement(m_writePointer),data);
        }
        m_empty.release();
    }

    public T get() throws InterruptedException {
        T data = null;
        m_empty.acquire();
        synchronized (this) { // << Commenting this doesn't work
            // CAS-based overflow handling
            data = m_buffer.get(safeGetAndIncrement(m_readPointer));
        }
        m_full.release();
        return data;
    }
}

最佳答案

可能存在一个问题,即删除同步块(synchronized block)时,数组中的 get() 与增量不是原子的。我推测的破坏场景要求生产者超越消费者,那么如果信号量释放是由乱序读取触发的,那么您可以让生产者覆盖尚未读取的数组条目。

考虑缓冲区已满的情况(写入器索引位于 N,读取器索引位于 N+1)并且 2 个线程正在尝试从缓冲区读取。 (为简单起见,假设 N 不接近环绕点。)

线程 1 接收索引 N+1,从中读取其项目。

线程 2 接收索引 N+2,从中读取其项目。

由于调度的偶然性,线程 2 首先从缓冲区数组获取并在线程 1 从数组获取其项目之前释放 m_full 信号量。

线程 3(生产者)醒来并将一个项目写入缓冲区中的下一个可用槽 N+1,也是在线程 1 从缓冲区读取数据之前。

线程 1 然后获取索引 N+1 处的项目,但错过了它想要的项目。

关于java - 这个 BoundedBuffer 类的问题出在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7375310/

相关文章:

Java 8 可选——如何处理嵌套的对象结构

javascript - 限制Node js中的Q promise 并发

java - ConcurrentHashMap 和跨越 2 个单独调用的操作

java - 从 URL 读取图像

concurrency - Scrapy 并发或分布式抓取

android - 使用 Sprite 数组列表时出现并发错误

perl DBI 和准备好的语句名称冲突

java - 从另一个号码获取不太具体的号码

java - Excel 中的 DDE 信息

java - 正确使用 JRSwapFileVirtualizer?