我有一个主客户端,它为每个对等点保留后台计时器。这些计时器在后台线程中运行,并计划在 30 秒(超时时间)内执行将相应对等点标记为离线的任务。执行此操作的代码块是:
public void startTimer() {
timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
status = false;
System.out.println("Setting " + address.toString() + " status to offline");
// need to send failure message somehow
thread.sendMessage();
}
}, 5*1000);
}
然后,在主程序中,我需要一些方法来检测上面的计时器任务何时运行,以便主客户端可以向所有其他对等点发送一个failure
消息,比如:
while (true)
if (msgFromThreadReceived)
notifyPeers();
我如何使用 TimerTask 完成此任务?据我了解,计时器在单独的线程中运行,我想以某种方式将消息传递给主线程以通知主线程任务已运行。
最佳答案
我会让处理对等点计时器的类采用并发队列,并在对等点离线时将消息放入队列中。然后“主”线程可以以事件驱动的方式轮询队列,接收和处理消息。
请注意,这个“主”线程不能是 GUI 框架的事件分发线程。如果主线程收到消息时GUI中有什么需要更新的,它可以在收到消息后调用事件派发线程上的另一段代码。
如果队列应该是无界的(计时器线程可以在主线程提取消息之前将任意数量的消息放入队列中),那么队列的两个不错的选择是ConcurrentLinkedQueue
,或者LinkedBlockingQueue
如果队列的大小应该有限制,如果它变得太大,计时器线程必须等待才能将另一条消息放入其中(这称为背压,并且可能很重要在分布式并发系统中,但可能与您的情况无关)。
这里的想法是实现一个版本的 Actor 模型 (q.v.),其中线程(actor)之间不共享任何内容,并且需要发送的任何数据(应该是不可变的)在它们之间传递。每个参与者都有一个收件箱,它可以在其中接收消息并根据消息采取行动。只是,您的计时器线程可能不需要收件箱,如果它们将所有数据作为构造函数的参数并且在启动后不需要从主线程接收任何消息。
public record PeerDownMessage(String peerName, int errorCode) {
}
public class PeerWatcher {
private final Peer peer;
private final BlockingQueue<PeerDownMessage> queue;
public PeerWatcher(Peer peer, BlockingQueue<PeerDownMessage> queue) {
this.peer = Objects.requireNonNull(peer);
this.queue = Objects.requireNonNull(queue);
}
public void startTimer() {
// . . .
// time to send failure message
queue.put(new PeerDownMessage(peer.getName(), error));
// . . .
}
}
public class Main {
public void eventLoop(List<Peer> peers) {
LinkedBlockingQueue<PeerDownMessage> inbox =
new LinkedBlockingQueue<>();
for (Peer peer : peers) {
PeerWatcher watcher = new PeerWatcher(peer, inbox);
watcher.startTimer();
}
while (true) {
PeerDownMessage message = inbox.take();
SwingWorker.invokeLater(() {
// suppose there is a map of labels for each peer
JLabel label = labels.get(message.peerName());
label.setText(message.peerName() +
" failed with error " + message.errorCode());
});
}
}
}
请注意,要更新 GUI,我们会导致在另一个线程(Swing 事件调度线程)上执行该操作,该线程必须与我们的主线程不同。
您可以使用大型、复杂的框架来实现 actor 模型,但其核心是:线程之间不共享任何内容,因此您永远不需要同步或使任何东西变得不稳定,actor 需要它的任何东西都可以接收作为其构造函数的参数或通过其收件箱(在本例中,只有主线程有一个收件箱,因为工作线程一旦启动就不需要接收任何东西),最好使所有内容不可变。我使用 record
而不是消息类,但您可以使用常规 class
。只需将字段设置为 final,在构造函数中设置它们,并保证它们不能为 null,就像在 PeerWatcher
类中一样。
我说过主线程可以轮询“队列”,这意味着可能有多个队列,但在这种情况下,它们都发送相同类型的消息,并且它们在邮件正文。所以我只是给每个观察者一个对主线程同一个收件箱的引用。这可能是最好的。一个 Actor 应该只有一个收件箱;如果它需要做多件事,它可能应该是多个参与者(这是 Erlang 的方式,这也是我从中获得灵感的地方)。
但是如果你真的需要多个队列,main 可以像这样轮询它们:
while (true) {
for (LinkedBlockingQueue<PeerDownMessage> queue : queues) {
if (queue.peek() != null) {
PeerDownMessage message = queue.take();
handleMessageHowever(message);
}
}
}
但那是您不需要的许多额外内容。坚持每个参与者一个收件箱队列,然后轮询收件箱以获取要处理的消息很简单。
我最初编写此代码是为了使用 ConcurrentLinkedQueue,但我使用了 put
和 take
,它们是 BlockingQueue 的方法。我只是将其更改为使用 LinkedBlockingQueue,但如果您更喜欢 ConcurrentLinkedQueue,则可以使用 add
和 poll
但进一步考虑后,我真的会推荐 BlockingQueue,因为它的 take()
方法;它可以让您在等待下一个可用项目时轻松阻塞,而不是忙着等待。
关于java - 如何将消息从 TimerTask 传递到主线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71443096/