我刚刚开始为学术目的编写一个简单的多线程应用程序。
我按照主管的指定将代码分为 3 类。
ResponseQueue:包含一个队列和 2 个插入/删除方法。
发送者:将尝试发送,接收者将接收消息。
包含 id 和响应(字符串)的 Message 类。
使用 Junit 测试来查看代码是否有效:“TestSendReceiveFromResponseQueue”
代码如下:
public class ResponseQueue {
private ResponseQueue() {
}
private BlockingQueue<Message> queue = new SynchronousQueue<Message>();
public synchronized Message poll() {
Log log = LogFactory.getLog(this.getClass());
Message response = null;
try {
response = queue.take();
if (response != null) {
return response;
} else {
log.error("null");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
public synchronized boolean push(Message response) {
Log log = LogFactory.getLog(this.getClass());
System.out.println("tryin");
try {
queue.put(response);
return true;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}
private static class LazyHolder {
private static final ResponseQueue INSTANCE = new ResponseQueue();
}
public static ResponseQueue getInstance() {
return LazyHolder.INSTANCE;
}
}
public class TestSendReceiveFromResponseQueue extends TestCase{
ResponseQueue respQueue;
@Override
protected void setUp() throws Exception {
super.setUp();
respQueue= ResponseQueue.getInstance();
}
class Receiver implements Runnable {
Message message;
ArrayList<Message> messageList = new ArrayList< Message>();
@Override
public void run() {
message = null;
while(true){
message = respQueue.poll();
messageList.add(message);
}
}
}
class Sender implements Runnable {
@Override
public void run() {
Message msg = new Message();
msg.setResponse("test");
respQueue.push(msg);
}
}
public void test() {
Receiver r = new Receiver();
new Thread(r).start();
Sender s = new Sender();
new Thread(s).start();
assertEquals(false, r.messageList.isEmpty());
}
}
我不知道为什么它总是给出 nullpointerException 或无限循环。 欢迎任何帮助。
编辑:由于有用的答案更正了代码,但它仍然总是给我错误。 谢谢。
最佳答案
在 ResponseQueue.push 中:
if(!queue.isEmpty()) {
done = true;
return true;
}
有可能您的接收者线程在将消息放入队列后立即抓取消息,并且当发送者进行此检查时,队列再次为空,导致它循环并重新发布消息。
假设您打算让发件人等待收到消息,则应删除此检查,并且您应替换上面调用的行
queue.add
与 queue.put
,因为这正是以正确的线程安全方式为您进行检查。
您的接收者还应该使用 queue.take
而不是 queue.poll
,take
方法将等待消息可用,以便您可以删除循环检查消息是否为空。
最后,您的接收器正在运行无限 while(true)
循环,一旦收到消息,它所做的第一件事就是返回循环并再次将消息设置为 null。这可能是在测试有机会检查消息字段之前发生的,因此您的测试最终陷入无限循环。
一个简单的解决方案是删除 message = null;
行,因为您不再需要检查是否使用 queue.take
。
public void run() {
while(true) {
message = respQueue.take(); //This will wait for a message
}
}
(请注意,尽管仍然存在竞争条件,在测试查看前一条消息之前可能会收到新消息。此处的解决方法可能是将收到的消息存储在列表中。)
关于java - 简单发送/接收多线程总是给出空异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32306970/