java - 同一流应用程序中的 Kafka Streams 关闭 Hook 和意外异常处理

标签 java exception apache-kafka apache-kafka-streams shutdown-hook

我的任务是拆除开发环境并从废品中重新设置它以验证我们的 CI-CD 流程;唯一的问题是我搞砸了创建一个主题,因此 Kafka Streams 应用程序因错误而退出。

我深入研究并发现了问题并纠正了它,但当我深入研究时,我遇到了另一个奇怪的小问题。

我实现了一个意外异常处理程序,如下所示:

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    streams.close();

    // Maybe do some more exception handling.
    // Open a lock that is waiting after streams.start() call 
    // to let application exit normally
    shutdownLatch.countDown();
});

问题是,如果应用程序在调用 KafkaStreams::close 时由于主题错误而引发异常,则在尝试调用 KafkaStreams::waitOnState 后,应用程序似乎会死锁在 WindowsSelectorImpl::poll 中。

我认为这可能是在异常处理程序中调用 KafkaStreams::close 的问题,但我发现了这个 SO以及来自Matthias J. Sax的评论也就是说,在异常处理程序中调用 KafkaStreams::Close 应该可以,但需要注意的是不要从多个线程调用 KafkaStreams::close。

问题是我想实现一个关闭钩子(Hook)以根据请求优雅地终止 Steams 应用程序,并实现 UnexpectedException 处理程序以在发生异常时优雅地清理和终止。

我提出了以下解决方案,在调用 close 之前检查 KafkaStreams 状态,它确实有效,但它似乎有点不确定,因为除了运行(可能待定)之外,我还可以看到其他情况,我们希望确保 KafkaStreams::关闭它调用。

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    logger.fatal("Caught Shutdown request");
    // Do some shutdown cleanup.
    if (streams.state().isRunning())
    {
        If this hook is called due to the Main exiting after handling 
        an exception we don't want to call close again. It doesn't 
        cause any errors but logs that the application was closed 
        a second time.
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do a little bit more clean up before system exits.
    System.exit(0);

}));

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    if (streams.state().isRunning())
    {
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do some more exception handling.

    // Open the Gate to let application exit normally
    shutdownLatch.countDown();
    // Or Optionally call halt to immediately terminate and prevent call to Shutdown hook.
    Runtime.getRuntime().halt(0);
});

关于为什么在异常处理程序中调用 KafkaSteams:close 会导致此类麻烦的任何建议,或者如果有更好的方法来同时实现关闭 Hook 和异常处理程序,我们将不胜感激?

最佳答案

从异常处理程序和关闭 Hook 调用 close() 略有不同。如果从关闭 Hook 调用,close() 可能会死锁(参见 https://issues.apache.org/jira/browse/KAFKA-4366 ),因此,您应该在超时的情况下调用它。

此外,该问题与从未捕获的异常处理程序中调用 System.exit() 有关,如 Jira 中所述。一般来说,调用 System.exit() 是相当苛刻的,恕我直言,应该避免。

您的解决方案似乎也不是 100% 稳健,因为 streams.state().isRunning() 可能会导致竞争条件。

使用超时的替代方法可能是仅在关闭 Hook 和异常处理程序中设置一个 AtomicBoolean ,并在设置了 boolean 标志的情况下使用“main()”线程调用关闭为真:

private final static AtomicBoolean stopStreams = new AtomicBoolean(false);

public static void main(String[] args) {
  // do stuff

  KafkaStreams streams = ...
  stream.setUncaughtExceptionHandler((t, e) -> {
    stopStreams.set(true);
  });

  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    stopStreams.set(true);
  });

  while (!stopStreams.get()) {
    Thread.sleep(1000);
  }
  streams.close();
}

关于java - 同一流应用程序中的 Kafka Streams 关闭 Hook 和意外异常处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52528678/

相关文章:

java - 将图像从 android 串行发送到 java 应用程序时出错 -javax.imageio.IIOException : Bogus Huffman table definition

java - 如何检查数据库是否在事务之间锁定

java - 背景图像和 JtextField

C++:跨线程错误处理问题

java - JDBC中批量插入

c# - 流利的 NHibernate,实现 ISQLExceptionConverter 的正确方法

c++ - 我的代码中存在死锁/访问冲突,即使我已经相互排除它

apache-kafka - 不同服务器上的多个生产者写入同一主题是否可以接受?

python - 如何使用python在kafka中创建多个生产者和消费者

java - KStream-KStream inner join 抛出java.lang.ClassCastException