exception - Topology出现Exception则跳过记录

标签 exception apache-kafka apache-kafka-streams

我们正在编写一个 Kafka Streams Topology 来聚合数据并实时显示它们。我们希望使显示尽可能稳健 - 理想情况下记录记录并继续处理任何异常。

根据文档,我们和一些测试

Kafka Streams 很好地支持处理 Producer 中或反序列化期间发生的异常。提供的 LogAndContinueExceptionHandler 提供了我们想要的行为。然而,我们的主要问题是处理过程中发生的异常(例如 .mapValues().leftJoin()

我们的想法基本上是验证先决条件

  1. 在反序列化期间,如果未满足则抛出 DeserializationException(并记录并继续)。
  2. 如果无法执行计算(/by zero error 等),检查处理函数以返回默认值

但是,如果数据中存在不可预见的情况,异常仍可能冒出并且拓扑将关闭。

Kafka Streams 提供了一个UncaughtExceptionHandler,但它是在线程已经死亡后调用的,因此它不能用于防止拓扑关闭。

有什么方法可以编写跳过记录的 UncaughtExceptionHandler 吗?或者我们可以在处理函数内的 try-catch block 中跳过当前记录的机制?

最佳答案

我认为最好的解决方案是以永远不会抛出任何异常的方式编写处理操作(例如:Mapper、Filter 等)。为此,您可以使用一个包装器对象,它可以是成功或错误(例如:scala 中的 Either 类型)。之后,您可以使用 branch() 方法获取两个流:一个用于成功记录,一个用于错误记录。

下面的代码展示了基本思想:

    public static void main(String[] args) {
        var builder = new StreamsBuilder();
        KStream<Object, Result<Object>> stream = builder.stream("my-topic")
            .map((k, v) -> {
                try {
                    // unsafe operation, i.e that may throw an exception
                    return KeyValue.pair(k, new Success<>(v));
                } catch (Exception e) {
                    return KeyValue.pair(k, new Error<>(e));
                }
            });
        KStream<Object, Result<Object>>[] branch = stream.branch((k, v) -> !v.hasError(), (k, v) -> v.hasError());

        // Handle the success steam
        KStream<Object, Result<Object>> successStream = branch[0];

        // Handle the error steam, e.g:  log errors, write errors to a Dead Letter Queue
        KStream<Object, Result<Object>> errorStream = branch[1];
        
    }

    public interface Result<T> {
        T get() throws Exception;
        Exception exception();
        boolean hasError();
    }

    public static class Success<T> implements Result<T> {

        private final T value;

        public Success(T value) {
            this.value = value;
        }

        @Override
        public T get() throws Exception {
            return value;
        }

        @Override
        public Exception exception() {
            return null;
        }

        @Override
        public boolean hasError() {
            return false;
        }
    }

    public static class Error<T> implements Result<T> {

        private final Exception error;

        public Error(Exception error) {  this.error = error; }

        @Override
        public T get() throws Exception{
            throw error;
        }

        @Override
        public Exception exception() {
            return error;
        }

        @Override
        public boolean hasError() {
            return true;
        }
    }

此外,对于您提到的反序列化异常,项目Azkarra Streams提供了一些可以帮助您的方便的 java 类(例如 SafeSerdes、DeadLetterTopicExceptionHandler):GitHub

关于exception - Topology出现Exception则跳过记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64169657/

相关文章:

java - @Transactional 需要回滚所有内容,除了一个保存在数据库中

java - 处理一个不存在的 servlet 异常

c# - UnobservedTaskException - 任务从哪里来

java - 找不到依赖项 'org.apache.kafka:kafka-clients:2.1.1'

scala - Alpakka卡夫卡vs卡夫卡流

apache-kafka - 卡夫卡流 : PolicyViolationException: Topic replication factor must be 3

java - 打印露天异常

apache-kafka - Confluent 平台 Kafka Connect 因退出 137 崩溃

python - Kafka-Python 消耗最后一条未读消息

apache-kafka-streams - Kafka Streams 构建 StateStoreSupplier : API clarifications