azure - Eventhub Stream 未捕获架构不匹配

标签 azure databricks azure-databricks azure-eventhub

当我们从 eventhub 读取事件时,我们正在尝试实现 badRecordsPath,作为尝试让它工作的示例,我已经放入了应该使事件失败的模式:

eventStreamDF = (spark.readStream
  .format("eventhubs")
  .options(**eventHubsConf)
  .option("badRecordsPath", "/tmp/badRecordsPath/test1")
  .schema(badSchema)
  .load()
) 

但这永远不会失败并且总是读取事件,这是 databricks 的 eventhub 的读取流的行为吗?目前的解决方法是根据我们自己的架构检查 inferSchema。

最佳答案

EventHubs 中数据的架构是固定的(请参阅 docs )(对于 Kafka 也是如此) - 实际负载始终编码为名称为 body 的二进制字段,这取决于开发人员根据数据生产者和该数据消费者之间的“联系”来解码该二进制有效负载。因此,即使您指定架构和 badRecordsPath 选项,它们也不会被使用。

您将需要实现一些从 JSON 或其他内容解码数据的函数,例如,如果数据损坏,则返回 null,然后您将拥有一个针对 null 值的过滤器,以将流拆分为两个子流 -好的和坏的数据。

关于azure - Eventhub Stream 未捕获架构不匹配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69629855/

相关文章:

azure - 运行示例 Azure AD B2C 身份验证代码时遇到 x-frame-options 问题

python - 无法迁移 Azure Django 应用程序的 Postgresql DB

azure - Iot Hub>Event Hub>FA 是否存储发送消息的顺序

databricks - 如何防止 Delta Lake 检查点在 Azure Databricks 中被删除?

azure - Databricks FileInfo : java. lang.ClassCastException : com. databricks.backend.daemon.dbutils.FileInfo 无法转换为 com.databricks.service.FileInfo

azure - Databricks 笔记本时间表

Azure blob 容器 listBlobs 用于在 java 中获取具有特定扩展名的文件

python - 更改 databricks 笔记本的默认语言

python - 使用 Databricks 将 Docx 保存在 Azure Blob 存储中

python - Azure 数据湖 - 使用 Python 读取