java - 处理(Drop and Log) Kafka producer 发布的不良数据,这样 Spark (Java) Consumer 不会将其存储在 HDFS 中

标签 java apache-spark exception error-handling apache-kafka

目前,我正在使用 Java 内置的 Spark Consumer 读取 Kafka Producer 发布的记录(json)并将其存储在 hdfs 中。如果假设我的记录具有以下属性(id、名称、公司、发布日期),目前,我正在处理异常,如果缺少其中一个属性,则程序会引发运行时异常,其中的日志消息显示其中之一该属性丢失,但问题是,由于异常,整个 Spark 作业完全停止。我想通过避免这种情况来处理这些不良记录,这样程序不会停止整个 Spark 作业,而是丢弃并记录这些不良记录而不是抛出异常。

最佳答案

答案将基于意见。这是我会做的,

不要在日志文件中记录拒绝,因为这可能很大,您可能需要重新处理它们。而是为被拒绝的记录创建另一个数据集,并给出拒绝的理由。您的过程将产生 2 个数据集 - 好的数据集和拒绝的数据集。

尽管有可能,但不应将异常用于代码的控制流。我会使用谓词/过滤器/IF 条件的想法,它将检查数据并拒绝不符合谓词/过滤器/IF 条件的数据。

如果您使用异常,则将其绑定(bind)到处理单个记录而不是整个作业。最好避免这种想法。

关于java - 处理(Drop and Log) Kafka producer 发布的不良数据,这样 Spark (Java) Consumer 不会将其存储在 HDFS 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60725014/

相关文章:

java - 使用 pdfTron 创建带有参数 Inputstream 的 PDFDoc 时出现问题

validation - java.lang.ClassFormatError : Absent Code attribute in method that is not native or abstract in class file javax/validation/Validation

java - 无法在android上构建应用程序ionic

java - 无法对任何实例化的 Spark 数据结构进行操作?

java - SpringBoot : create object from generic type in generic mapper

apache-spark - PySpark + Cassandra : Getting distinct values of partition key

scala - 如何在spark中为diff文件名调用单独的逻辑

C++ 如何从线程生成函数捕获 Boost 中线程抛出的异常

java - 应用程序关闭/重新启动时如何关闭 db4o 连接?

java - 如何计算 Android TextView 中的字符数?