我有一个消息流,我需要处理来自其中的消息,然后将它们存储在数据库中。在 Java 中,我编写了轮询代码,该代码每 20 秒轮询一次流并使用消息。
这是在无限 for 循环内完成的,如下所示:
for (;;) {
try{
//1. Logic for polling.
//2. Logic for processing the message.
//3. Logic for storing the message in database.
Thread.sleep(20000 - <time taken for above 3 steps >);
} catch(Exception E){
//4. Exception handling.
}
}
此逻辑按预期运行并且对流进行轮询,但偶尔会遇到异常或出现问题并停止轮询。
我想要一种机制,一旦轮询停止,假设这个 for
循环没有运行 60 秒,我应该收到邮件或 ping。
如果这个 for 循环没有运行 60 秒,我该如何调用方法?
我的想法是,每个 for 循环执行都会 ping 一个心跳,当没有从 for 循环收到该心跳 ping 时,就会调用邮件发送。
最佳答案
轮询停止取得进展有两种不同的原因,每种原因都需要不同的方法:
如果逻辑抛出Exception
以外的Throwable
,例如Error
,则catch不匹配,并且执行将离开for 循环,并可能到达线程的 UncaughtExceptionHandler
,其默认实现将异常记录到 System.err
并终止线程。为了防止这种情况,您应该捕获 Throwable
而不是 Exception
。
第二种可能性是逻辑中的某些步骤没有终止,例如由于无限循环、死锁、等待 I/O 操作或其他原因。在这种情况下,您需要进行线程转储来查看线程被卡住的位置。您可以按如下方式自动执行此操作:
class Watchdog {
final Duration gracePeriod;
final Thread watchedThread;
volatile Instant lastProgress;
public Watchdog(Duration gracePeriod) {
this.gracePeriod = gracePeriod;
watchedThread = Thread.currentThread();
everythingIsFine();
var t = new Thread(this::keepWatch);
t.setDaemon(true);
t.start();
}
public void everythingIsFine() {
lastProgress = Instant.now();
}
void keepWatch() {
while (true) {
var silence = Duration.between(lastProgress, Instant.now());
if (silence.compareTo(gracePeriod) > 0) {
System.err.println("Watchdog hasn't seen any progress for " + silence.toSeconds() + " seconds. The watched thread is currently at:");
for (var element : watchedThread.getStackTrace()) {
System.err.println("\tat " + element);
}
}
try {
Thread.sleep(gracePeriod);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
您可以按如下方式使用:
public class Test {
void step() throws Exception {
System.in.read();
}
void job() {
var snoopy = new Watchdog(Duration.ofSeconds(2));
for (;;) {
try {
step();
snoopy.everythingIsFine();
Thread.sleep(1000);
} catch (Throwable t) {
System.err.println(t);
}
}
}
public static void main(String[] args) throws Exception {
new Test().job();
}
}
一旦宽限期过去,WatchDog 将打印如下内容:
Watchdog hasn't seen any progress for 2 seconds. The watched thread is currently at: at java.base/java.io.FileInputStream.readBytes(Native Method) at java.base/java.io.FileInputStream.read(FileInputStream.java:293) at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:255) at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:289) at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:276) at stackoverflow.Test.step(Test.java:48) at stackoverflow.Test.job(Test.java:55) at stackoverflow.Test.main(Test.java:65)
关于java - 如何检查轮询何时停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75497509/