java - 如何检查轮询何时停止

标签 java

我有一个消息流,我需要处理来自其中的消息,然后将它们存储在数据库中。在 Java 中,我编写了轮询代码,该代码每 20 秒轮询一次流并使用消息。

这是在无限 for 循环内完成的,如下所示:

for (;;) {
    try{
        //1. Logic for polling.
        //2. Logic for processing the message.
        //3. Logic for storing the message in database.
        Thread.sleep(20000 - <time taken for above 3 steps >); 
    } catch(Exception E){
        //4. Exception handling.
    }
}

此逻辑按预期运行并且对流进行轮询,但偶尔会遇到异常或出现问题并停止轮询。

我想要一种机制,一旦轮询停止,假设这个 for 循环没有运行 60 秒,我应该收到邮件或 ping。

如果这个 for 循环没有运行 60 秒,我该如何调用方法?

我的想法是,每个 for 循环执行都会 ping 一个心跳,当没有从 for 循环收到该心跳 ping 时,就会调用邮件发送。

最佳答案

轮询停止取得进展有两种不同的原因,每种原因都需要不同的方法:

如果逻辑抛出Exception以外的Throwable,例如Error,则catch不匹配,并且执行将离开for 循环,并可能到达线程的 UncaughtExceptionHandler,其默认实现将异常记录到 System.err 并终止线程。为了防止这种情况,您应该捕获 Throwable 而不是 Exception

第二种可能性是逻辑中的某些步骤没有终止,例如由于无限循环、死锁、等待 I/O 操作或其他原因。在这种情况下,您需要进行线程转储来查看线程被卡住的位置。您可以按如下方式自动执行此操作:

class Watchdog {
    final Duration gracePeriod;
    final Thread watchedThread;
    volatile Instant lastProgress;

    public Watchdog(Duration gracePeriod) {
        this.gracePeriod = gracePeriod;
        watchedThread = Thread.currentThread();
        everythingIsFine();
        
        var t = new Thread(this::keepWatch);
        t.setDaemon(true);
        t.start();
    }
    
    public void everythingIsFine() {
        lastProgress = Instant.now();
    }
    
    void keepWatch() {
        while (true) {
            var silence = Duration.between(lastProgress, Instant.now());
            if (silence.compareTo(gracePeriod) > 0) {
                System.err.println("Watchdog hasn't seen any progress for " + silence.toSeconds() + " seconds. The watched thread is currently at:");
                for (var element : watchedThread.getStackTrace()) {
                    System.err.println("\tat " + element);
                }
            }
            try {
                Thread.sleep(gracePeriod);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

您可以按如下方式使用:

public class Test {
    void step() throws Exception {
        System.in.read();
    }
    
    void job() {
        var snoopy = new Watchdog(Duration.ofSeconds(2));
        for (;;) {
            try {
                step();
                snoopy.everythingIsFine();
                Thread.sleep(1000);
            } catch (Throwable t) {
                System.err.println(t);
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        new Test().job();
    }
}

一旦宽限期过去,WatchDog 将打印如下内容:

Watchdog hasn't seen any progress for 2 seconds. The watched thread is currently at:
    at java.base/java.io.FileInputStream.readBytes(Native Method)
    at java.base/java.io.FileInputStream.read(FileInputStream.java:293)
    at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:255)
    at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:289)
    at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:276)
    at stackoverflow.Test.step(Test.java:48)
    at stackoverflow.Test.job(Test.java:55)
    at stackoverflow.Test.main(Test.java:65)

关于java - 如何检查轮询何时停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75497509/

相关文章:

java - 堆积面积图,固定 y 轴标签宽度

java - Freemarker 通过 Java 设置 checkTemplateLocation

Java 继承 : Why super class variable is referred

Java - AES CBC 算法生成 SecretKeySpec 的不同方式

jvm - openjdk代码编译/IDE设置

java - 具有默认系统身份验证/用户的 SecurityContext

java - spring+JPA EntityManager注入(inject)service和dao

java - 将鼠标悬停在 JButton 上时,会出现不需要的文本

java - 打印出变量的每个增量

java - Quarkus 中的 Keycloak 管理客户端 - java.lang.InknownClassChangeError