我的 Cassandra 接收器配置如下所示:
ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
.withPort(props.getCassandraPort())
.withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
.build();
};
CassandraSink
.addSink(cassandraObjectStream)
.setClusterBuilder(secureCassandraSinkClusterBuilder)
.build()
.name("Cassandra-Sink");
现在,当与 Cassandra 的连接配置不正确时,我会得到一个NoHostAvailableException,或者当连接意外断开时,我会得到一个ConnectionTimeOutException,或者有时是一个写入超时异常。这最终会触发一个 JobExecutionException 并且整个 Flink 作业终止。
我在哪里可以捕获这些 Cassandra 异常?这些丢到哪里去了?我尝试在 CassandraSink 周围放置一个 try-catch block ,但没有成功。我想捕获这些异常并在连接超时时重试连接到 Cassandra,或者在写入超时时重试写入 Cassandra。
最佳答案
据我所知,您不能尝试使用 CassandraSink
捕获这些异常。
捕获像TimeoutException这样的异常的一种方法是为Cassandra实现自己的sink,但这可能需要很多时间......
另一种方法是,如果您运行流式作业,您可以通过 StreamingExecutionEnvironment.setRestartStrategy
将任务重试设置为大于 1,并启用检查点,以便流式作业可以根据最后一个检查站。 CassandraSink
支持 WAL,因此可以在启用检查点的情况下实现 EXACTLY_ONCE
。
关于java - 连接 Cassandra 失败时如何处理异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49646646/