java - 如何正确让多个线程访问共享缓冲区

标签 java concurrency deadlock synchronizedcollection

我有一个生产者/消费者的情况,生产者生产域供消费者访问。消费者发送 https 请求并从页面获取链接并将其提交回生产者。当生产者完成时,消费者不会完成并卡在最终域上。我一生都无法弄清楚为什么会发生这种情况。

I have simplified my question

主要:

public class Main {

    public static void main(String[] args) throws InterruptedException {
        try
        {
            Broker broker = new Broker();

            ExecutorService threadPool = Executors.newFixedThreadPool(3);


            threadPool.execute(new Consumer(broker));
            threadPool.execute(new Consumer(broker));
            Future producerStatus = threadPool.submit(new Producer(broker));

            // this will wait for the producer to finish its execution.
            producerStatus.get();


            threadPool.shutdown();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }

    }

}

经纪人:

public class Broker {
    private BlockingQueue<String> QUEUE = new LinkedBlockingQueue<String>();
    public Boolean continueProducing = Boolean.TRUE;

     public void put(String data) throws InterruptedException
     {
            this.QUEUE.put(data);
     }

     public String get() throws InterruptedException
     {
            //return this.queue.poll(1, TimeUnit.SECONDS);
            return this.QUEUE.take();
     }
}

消费者:

public class Consumer implements Runnable{

    private Broker broker;


    public Consumer(Broker broker) {
        this.broker = broker;
    }

    @Override
    public void run() {


        try {
            String data = broker.get();
            while (broker.continueProducing || data != null)
            {
                Thread.sleep(1000);
                System.out.println("Consumer " + Thread.currentThread().getName() + " processed data from broker: " + data);
                data = broker.get();
            }

            System.out.println("Comsumer " + Thread.currentThread().getName() + " finished its job; terminating.");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }


    }


}

制作人:

public class Producer implements Runnable{

    private Broker broker;


    public Producer(Broker broker) {
        this.broker = broker;
    }

    @Override
    public void run() {
        try
        {
            for (int i = 0; i < 2; ++i) {
                System.out.println("Producer produced: " + "https://example" + i + ".com");
                Thread.sleep(100);
                broker.put("https://example" + i + ".com");
            }

            //broker.put("https://example.com/2");
            this.broker.continueProducing = Boolean.FALSE;
            System.out.println("Producer finished its job; terminating.");
        }catch(Exception e) 
        {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }

    }

}

最佳答案

更新的答案:

当我运行你的代码时,消费者卡在了data =broker.get()行上。代理正在调用 BlockingQueue.take 方法。这是此方法的 Javadoc(重点是我的):

Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

这意味着即使生产者没有生产任何东西,消费者仍然会等待生产一些东西。

一种可能的解决方案是使用“poison pill ”方法。假设您只有一个生产者,您的 Broker 类可能如下所示:

public class Broker {

  private static final String POISON_PILL = "__POISON_PILL__";
  private BlockingQueue<String> queue = new LinkedBlockingQueue<>();

  public void put(String data) {
    queue.add(data);
  }

  public void doneProducing() {
    queue.add(POISON_PILL);
  }

  public String get() throws InterruptedException {
    String result = queue.take();
    if (result.equals(POISON_PILL)) {
      queue.add(POISON_PILL);
      return null;
    } else {
      return result;
    }
  }

}
<小时/>

先前代码的答案:

如果您可以缩小这个问题的范围,使其仅包含最少量的代码来解决死锁,那就太好了。目前,您发布的许多代码是不相关的,还有一些相关的代码您没有发布。

此外,您当前的代码存在很多问题。您的 toLinkedHashSet 方法无法编译。在您的 add 方法中,您正在调用 BlockingQueue.put 方法,即使您的 BlockingQueue 永远不会达到其限制。您声称需要 Ο(1) 时间来执行 contains,但您的代码有 Ο(n) 时间。您似乎还在 addAllcontains 方法中进行了大量不必要的复制。

这里没有足够的信息让我知道问题是什么,但可能导致您的问题的一件事是您的 get 方法。如果使用者线程被中断,那么您的 get 方法将导致它自行中断(这可能不会导致死锁,但可能看起来像死锁)。在 Java 中,忽略异常是很难接受的。如果您调用 take 方法抛出 InterruptedException,这是有原因的:另一个线程希望当前线程停止。您的 get 方法应该抛出 InterruptedException。例如:

public String get() throws InterruptedException {
    return unprocessed.take();
}

如果您确实需要 get 方法不抛出 InterruptedException,您可以抛出一些其他包含 InterruptedException 的链式异常。如果在中断时返回 "" 确实合适,您可以执行以下操作:

public String get() {
  try {
    return unprocessed.take();
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return "";
  }
}

通过中断当前线程,您可以确保至少当前线程被标记为已中断,以便后续的某些操作可以处理它。但如果可能的话,抛出 InterruptedException 可能是最合适的。

我仍然不明白为什么您要为 LinkedBlockingQueue 创建自己的包装器,而不是仅使用 LinkedBlockingQueue 本身。看起来您在 LinkedBlockingQueue 之上添加的所有内容除了减慢速度之外什么也没做。

关于java - 如何正确让多个线程访问共享缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51495340/

相关文章:

c++ - 如何使用 libpqxx API 同时插入数据? (PostgreSQL,线程)

Python urllib2.urlopen() 很慢,需要一个更好的方法来读取多个 url

c++ - 是否可以使用互斥锁来锁定 vector 中的元素而不是整个 vector ?

java - Spring:检查给定端点是否存在

java - 代码的 “side-effects”到底是什么意思?

go - Effective Go中的客户请求处理程序示例导致死锁?

C++:std::async 和 std::mutex 在 Linux 上导致死锁但在 Windows 上运行?

java - 如何判断一个大数是偶数还是奇数?

java - jdk7合规级别: how to migrate code source from 6 to 7

java - 这在生产服务器上安全吗?