java - 连接 Cassandra 失败时如何处理异常?

标签 java cassandra apache-flink

我的 Cassandra 接收器配置如下所示:

    ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .build();
    };

    CassandraSink
            .addSink(cassandraObjectStream)
            .setClusterBuilder(secureCassandraSinkClusterBuilder)
            .build()
            .name("Cassandra-Sink");

现在,当与 Cassandra 的连接配置不正确时,我会得到一个NoHostAvailableException,或者当连接意外断开时,我会得到一个ConnectionTimeOutException,或者有时是一个写入超时异常。这最终会触发一个 JobExecutionException 并且整个 Flink 作业终止。

我在哪里可以捕获这些 Cassandra 异常?这些丢到哪里去了?我尝试在 CassandraSink 周围放置一个 try-catch block ,但没有成功。我想捕获这些异常并在连接超时时重试连接到 Cassandra,或者在写入超时时重试写入 Cassandra。

最佳答案

据我所知,您不能尝试使用 CassandraSink 捕获这些异常。

捕获像TimeoutException这样的异常的一种方法是为Cassandra实现自己的sink,但这可能需要很多时间......

另一种方法是,如果您运行流式作业,您可以通过 StreamingExecutionEnvironment.setRestartStrategy 将任务重试设置为大于 1,并启用检查点,以便流式作业可以根据最后一个检查站。 CassandraSink 支持 WAL,因此可以在启用检查点的情况下实现 EXACTLY_ONCE

关于java - 连接 Cassandra 失败时如何处理异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49646646/

相关文章:

java - Cassandra java 驱动程序设置全局一致性级别

java - 利息计算器单利计算中的截断错误(涉及DecimalFormat)

java - 在java中获取和编辑Drools规则

java - 将字符串转换为私钥和公钥 (RSA)

scala - Flink 无法序列化 Scala 类/任务不可序列化

maven - 使用 Flink 处理 Twitter 数据时的依赖问题

java - Flink Python 自定义连接器/源代码

java - 我在 map 上有一条路。如何将其转换为函数?

Hadoop MapReduce - Pig/Cassandra - 无法创建输入拆分

solr - 如果 Caasandra 没有搜索功能,FaceBook 如何使用 Cassandra 进行收件箱搜索?