java - 在订阅者线程中调用 Thread.Sleep 会导致发布者线程 hibernate

标签 java multithreading design-patterns

我已经在我的应用程序中实现了发布和订阅模式,但是当我在任何一个订阅者中调用 Thread.sleep() 方法或我的任何一个订阅者抛出异常时,所有其他订阅者和发布者都会受到影响通过这个,我怎样才能防止这种情况发生。

我已经为上述问题创建了一个小演示

发布者代码

import java.util.Random;

public class Publisher extends Thread {

    Broker broker = Broker.getInstance();
    Random random = new Random();

    @Override
    public void run() {
        while (true) {
            System.out.println("Published " + new Timestamp(System.currentTimeMillis()));
            broker.updateSubscribers(Integer.toString(random.nextInt(250)));
        }

    }
}

订阅者界面

public interface Subscriber {

    public void onUpdate(String message);
}

消息订阅者代码

import java.util.logging.Level;
import java.util.logging.Logger;

public class MessageSubscriber extends Thread implements Subscriber {

    Broker broker = Broker.getInstance();

    @Override
    public void run() {
        System.out.println("MessageSubscriber started...");
        broker.subscribe(this);
    }

    @Override
    public void onUpdate(String message) {
        try {
            System.out.println(message);
            sleep(1000);                    // called sleep affects the publisher too
        } catch (InterruptedException ex) {
            Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

}

如您所见,我在 MessageSubscriber 中调用了 sleep 方法,这也会影响 Publisher 并使其在这段时间内也处于 sleep 状态

编辑添加的经纪人代码

import java.util.ArrayList;
import java.util.List;

/**
 *
 * @author hemants
 */
public class Broker {

    List<Subscriber> subscribersList = new ArrayList<>();

    private Broker() {
    }

    public static Broker getInstance() {
        return BrokerHolder.INSTANCE;
    }

    private static class BrokerHolder {

        private static final Broker INSTANCE = new Broker();
    }

    public void subscribe(Subscriber s) {
        subscribersList.add(s);
    }

    public void unsubscribe(Subscriber s) {
        subscribersList.remove(s);
    }

    public void updateSubscribers(String message) {
        subscribersList.stream().forEach(subscriber -> subscriber.onUpdate(message));
    }
}

运行上面代码的主类

public class PubSubPattern {

    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.start();

        MessageSubscriber messageSubscriber = new MessageSubscriber();
        messageSubscriber.start();
    }
}

好吧,我已经像下面这样编辑了我的 MessageSubscribe 代码,它正在做我预期的事情

import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author hemants
 */
public class MessageSubscriber extends Thread implements Subscriber {

    Broker broker = Broker.getInstance();

    @Override
    public void run() {
        System.out.println("MessageSubscriber started...");

        while (true) {
            try {
                broker.subscribe(this);
                System.out.println("subscribed ");
                sleep(1000);
                broker.unsubscribe(this);
                System.out.println("un subscribed");
                sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    @Override
    public void onUpdate(String message) {
        System.out.println(message);
    }

}

你怎么看这个

最佳答案

所以你执行这样的事情

subscribersList.stream().forEach(subscriber -> subscriber.onUpdate(message));

并且在onUpdate期间你 sleep

所以它有效

subscribersList.stream().forEach(subscriber -> Thread.sleep());

甚至更冗长

for(Subscriber sub:subscribers){
   Thread.sleep(xxx);
}

难怪它会“影响”其他听众,因为调用者在这里被阻止了。调用者线程在每个元素上 hibernate 。

要么使用线程池提交更新任务,要么使用subscribersList.parallelStream()

我希望这仅用于教育目的。

关于java - 在订阅者线程中调用 Thread.Sleep 会导致发布者线程 hibernate ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51532046/

相关文章:

c++ - par_unseq 和 "vectorization-unsafe"函数

java - 使用依赖注入(inject) w/guice 的可浏览源项目?

java - 无法使用 iText PdfDiv 绝对位置

java - Spring Cloud Sleuth v2.2.3 如何将 TraceId 传播到另一个线程

java - 运行时出错 'Clojure REPL' : Error creating RunProfileState

c++ - 使用 QT + OpenCV 的多线程

design-patterns - 我们是否有效地使用了 IoC?

c# - 一种类似于 CancellationTokenSource 和 CancellationToken 模式的暂停线程模式?

java - 如何使用 JSch ssh 到 ZXA10 C320 设备

java - 测试所有级别的访问修饰符控制