我已经在我的应用程序中实现了发布和订阅模式,但是当我在任何一个订阅者中调用 Thread.sleep()
方法或我的任何一个订阅者抛出异常时,所有其他订阅者和发布者都会受到影响通过这个,我怎样才能防止这种情况发生。
我已经为上述问题创建了一个小演示
发布者代码
import java.util.Random;
public class Publisher extends Thread {
Broker broker = Broker.getInstance();
Random random = new Random();
@Override
public void run() {
while (true) {
System.out.println("Published " + new Timestamp(System.currentTimeMillis()));
broker.updateSubscribers(Integer.toString(random.nextInt(250)));
}
}
}
订阅者界面
public interface Subscriber {
public void onUpdate(String message);
}
消息订阅者代码
import java.util.logging.Level;
import java.util.logging.Logger;
public class MessageSubscriber extends Thread implements Subscriber {
Broker broker = Broker.getInstance();
@Override
public void run() {
System.out.println("MessageSubscriber started...");
broker.subscribe(this);
}
@Override
public void onUpdate(String message) {
try {
System.out.println(message);
sleep(1000); // called sleep affects the publisher too
} catch (InterruptedException ex) {
Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
如您所见,我在 MessageSubscriber 中调用了 sleep 方法,这也会影响 Publisher 并使其在这段时间内也处于 sleep 状态
编辑添加的经纪人代码
import java.util.ArrayList;
import java.util.List;
/**
*
* @author hemants
*/
public class Broker {
List<Subscriber> subscribersList = new ArrayList<>();
private Broker() {
}
public static Broker getInstance() {
return BrokerHolder.INSTANCE;
}
private static class BrokerHolder {
private static final Broker INSTANCE = new Broker();
}
public void subscribe(Subscriber s) {
subscribersList.add(s);
}
public void unsubscribe(Subscriber s) {
subscribersList.remove(s);
}
public void updateSubscribers(String message) {
subscribersList.stream().forEach(subscriber -> subscriber.onUpdate(message));
}
}
运行上面代码的主类
public class PubSubPattern {
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.start();
MessageSubscriber messageSubscriber = new MessageSubscriber();
messageSubscriber.start();
}
}
好吧,我已经像下面这样编辑了我的 MessageSubscribe 代码,它正在做我预期的事情
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author hemants
*/
public class MessageSubscriber extends Thread implements Subscriber {
Broker broker = Broker.getInstance();
@Override
public void run() {
System.out.println("MessageSubscriber started...");
while (true) {
try {
broker.subscribe(this);
System.out.println("subscribed ");
sleep(1000);
broker.unsubscribe(this);
System.out.println("un subscribed");
sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
@Override
public void onUpdate(String message) {
System.out.println(message);
}
}
你怎么看这个
最佳答案
所以你执行这样的事情
subscribersList.stream().forEach(subscriber -> subscriber.onUpdate(message));
并且在onUpdate
期间你 sleep
所以它有效
subscribersList.stream().forEach(subscriber -> Thread.sleep());
甚至更冗长
for(Subscriber sub:subscribers){
Thread.sleep(xxx);
}
难怪它会“影响”其他听众,因为调用者在这里被阻止了。调用者线程在每个元素上 hibernate 。
要么使用线程池提交更新任务,要么使用subscribersList.parallelStream()
我希望这仅用于教育目的。
关于java - 在订阅者线程中调用 Thread.Sleep 会导致发布者线程 hibernate ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51532046/