当我们从 eventhub 读取事件时,我们正在尝试实现 badRecordsPath,作为尝试让它工作的示例,我已经放入了应该使事件失败的模式:
eventStreamDF = (spark.readStream
.format("eventhubs")
.options(**eventHubsConf)
.option("badRecordsPath", "/tmp/badRecordsPath/test1")
.schema(badSchema)
.load()
)
但这永远不会失败并且总是读取事件,这是 databricks 的 eventhub 的读取流的行为吗?目前的解决方法是根据我们自己的架构检查 inferSchema。
最佳答案
EventHubs 中数据的架构是固定的(请参阅 docs )(对于 Kafka 也是如此) - 实际负载始终编码为名称为 body
的二进制字段,这取决于开发人员根据数据生产者和该数据消费者之间的“联系”来解码该二进制有效负载。因此,即使您指定架构和 badRecordsPath
选项,它们也不会被使用。
您将需要实现一些从 JSON 或其他内容解码数据的函数,例如,如果数据损坏,则返回 null,然后您将拥有一个针对 null 值的过滤器,以将流拆分为两个子流 -好的和坏的数据。
关于azure - Eventhub Stream 未捕获架构不匹配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69629855/