目前,我正在使用 Java 内置的 Spark Consumer 读取 Kafka Producer 发布的记录(json)并将其存储在 hdfs 中。如果假设我的记录具有以下属性(id、名称、公司、发布日期),目前,我正在处理异常,如果缺少其中一个属性,则程序会引发运行时异常,其中的日志消息显示其中之一该属性丢失,但问题是,由于异常,整个 Spark 作业完全停止。我想通过避免这种情况来处理这些不良记录,这样程序不会停止整个 Spark 作业,而是丢弃并记录这些不良记录而不是抛出异常。
最佳答案
答案将基于意见。这是我会做的,
不要在日志文件中记录拒绝,因为这可能很大,您可能需要重新处理它们。而是为被拒绝的记录创建另一个数据集,并给出拒绝的理由。您的过程将产生 2 个数据集 - 好的数据集和拒绝的数据集。
尽管有可能,但不应将异常用于代码的控制流。我会使用谓词/过滤器/IF 条件的想法,它将检查数据并拒绝不符合谓词/过滤器/IF 条件的数据。
如果您使用异常,则将其绑定(bind)到处理单个记录而不是整个作业。最好避免这种想法。
关于java - 处理(Drop and Log) Kafka producer 发布的不良数据,这样 Spark (Java) Consumer 不会将其存储在 HDFS 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60725014/