java - Java中的生产者消费者

标签 java producer-consumer blockingqueue

我写了下面的代码,但我觉得我哪里出错了:

public class ProcessQueue {

static BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

public ProcessQueue() {
    process();
}

public void add(String message) throws InterruptedException {
    System.out.println("Added Queue size:" + queue.size());
    System.out.println("Locked by Producer");
    queue.put(message);
    System.out.println("Lock Released by Producer");
}

public static void process() {
    new Thread() {

        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println("Locked by Consumer");
                    Message.send(queue.take());
                    System.out.println("Locked Released by Consumer");
                    System.out.println("Consuming Queue size:" + queue.size());
                }
            } catch (Exception ex) {
                System.out.print(ex.getMessage());
            }
        }
    }.start();
}
}

此处 add(String) 将字符串添加到队列中。只要它从 UDP 端口接收到输入,就会调用它。 process() 处理队列并将其发送给类 Message 进行处理。输出 Locked 和 Released Print Statements 的顺序不符合要求。

编辑

我期望的答案应该是: 如果它在添加的生产者中,则由生产者锁定 -> 然后添加到队列 -> 锁定释放。同样的方式也适用于消费者。但是操作不应该交错,即一旦打印出生产者锁定,它不应该打印出消费者锁定然后释放锁。

最佳答案

这里唯一会发生阻塞的时间是队列为空时。否则看跌期权将继续发生。所以你可能会看到队列的大小没有增加 1。您可能希望对 LinkedBlockingQueue 设置一个界限。仅供引用,LBQ 默认是无界的

根据您的编辑进行编辑:

到目前为止,我的回答是解释您所看到的内容及其原因。您正在寻找同步消息传递队列。您可以使用以下方法执行此操作:

new SynchrnousQueue();
new LinkedBlockingQueue(1);
new ArrayBlockingQueue(1);
new TransferQueue();

SynchhrnousQueue 完全符合您的要求。边界为 1 的 Linked&ArrayBlockingQueue 几乎做同样的事情。 TransferQueue 是 Java 7 中提供的一个新队列,它具有等待线程准备好获取数据的 transfer 方法。

关于java - Java中的生产者消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10145813/

相关文章:

python - "closable"队列的数据类型,用于处理多个生产者和消费者的项目流

java - 使用java中的任务执行器将1000万条记录插入数据库

Java:区分InputStream中的输入

java - java中生产者消费者使用队列

java - Elasticsearch 范围日期

java - 在 Apache Kafka 中设置多个分区

java - JMSException InterruptedIOException - 生产者线程被中断

Java BlockingQueue take() 与 poll()

java - Netty TCP 套接字开销

java - 5 个持久类的非持久父类(super class),具有/相同的 3 个有时持久的字段?