java - 如何在生产者-消费者模型中正确使用SynchronousQueue?

标签 java multithreading producer-consumer java.util.concurrent

我编写了一个在生产者-消费者模型中使用 SynchronousQueue 的测试示例。但效果并不好。以下是我的代码:

public class QueueTest {

    String input;
    int pos;
    BlockingQueue<String> queue;
    volatile boolean exitFlag;

    QueueTest()
    {
        for(int i=0; i<10000; i++)
            input += "abcde";
        input += "X";
        pos = 0;
        queue = new SynchronousQueue<String>();
        exitFlag = false;
    }

    public static void main(String[] args) {
        QueueTest qtest = new QueueTest();
        qtest.runTest();
    }

    void runTest()
    {
        Thread producer = new Thread( new Producer());
        Thread consumer = new Thread( new Consumer());
        producer.start();
        consumer.start();
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    class Producer implements Runnable
    {
        public void run()
        {
            while(true)
            {
                String s = read();
                if(s.equals("X"))
                    break;
                try {
                    queue.put(s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            exitFlag = true;
        }
    }

    class Consumer implements Runnable
    {
        public void run()
        {
            while(exitFlag == false)
            {
                String s = null;
                try {
                    s = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                process(s);  
            }
        }
    }

    String read()
    {
        String str = input.substring(pos, pos+1);
        pos++;
        return str;
    }

    void process(String s)
    {
        long sum = 0;
        for(long i=0; i<1000; i++)
            sum = sum * i + i;
    }
}

问题是运行像死锁一样卡住。这些简单的代码有什么bug吗?

最佳答案

您很可能会看到竞争状况。想象一下场景

Thread 1 put into queue
Thread 2 takes out of queue quickly processes and awaits another put from thread 1
Thread 1 finishes and sets exitFlag to true

在这种情况下,线程 2 将永久处于 hibernate 状态,因为在线程 2 读取之前 exitFlag 未设置为 false。

您可能需要考虑毒丸。这是我们已完成的发送给其他线程的消息。例如:

   final String POISON_PILL = " POISON";

class Producer implements Runnable {
    public void run() {
        while (true) {
            String s = read();
            if (s.equals("X"))
                break;
            try {
                queue.put(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            queue.put(POISON_PILL);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    public void run() {
        String s = null;
        try {
            while ((s = queue.take()) != POISON_PILL) {
                process(s);
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

因此,当另一个线程收到通知时,另一个线程已完成,两个线程都应该正常结束。

关于java - 如何在生产者-消费者模型中正确使用SynchronousQueue?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11959030/

相关文章:

java - Netty 客户端相对于 I/O 客户端的优势?

ios - 核心数据将 objectID 传递给另一个线程

java - 使用Thread查找2个键的最小距离

c# - 确定性监视器脉冲/等待并在生产者-消费者收集中实现超时

task-parallel-library - TPL 默认构造函数 BufferBlock : Value of DataFlowBlockOptions

使用 PThreads 的 C 生产者-消费者

java - J2ME读取文本文件

java - 运行服务两次

java - Kotlin - 继承自实现 Parcelable 的 Java 类

java - 无法让 2 个客户端监听一个 UDP 服务器