scala - 如何在 Spark Streaming 中验证 azure iot hub 的连接字符串?

标签 scala apache-spark spark-streaming azure-iot-hub azure-eventhub

我是 azure iot hub 的新手。我正在尝试使用 Spark Streaming 从 azure iot hub 提取消息。当我执行代码时,我收到错误,我可以理解连接字符串中存在一些问题。有没有特定的方法来验证 Spark 中的连接字符串,还请告诉我我指定的格式是否正确。

我的示例代码:

import org.apache.spark.eventhubs._

val eventHubName = "xyztest.azure-devices.net"
val eventHubNSConnStr = "Endpoint=sb://testname.servicebus.windows.net/;SharedAccessKeyName=primary;SharedAccessKey=abcedfgrdxyeurjrsdfyasdf="
val connStr = ConnectionStringBuilder(eventHubNSConnStr).setEventHubName(eventHubName).build 
val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(5)
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

错误:

java.util.concurrent.ExecutionException: com.microsoft.azure.eventhubs.IllegalEntityException: The messaging entity 'sb://testname.servicebus.windows.net/xyztest' could not be found.
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:169)

最佳答案

val eventHubName = "xyztest.azure-devices.net"

您似乎设置了错误的事件中心名称。 “xyztest.azure-devices.net”应该是您的 Azure IoT 中心主机名。

要查找事件中心名称,您可以转到物联网中心 -> 端点 -> 事件并复制事件中心兼容名称的值,如下所示:

enter image description here

最后,event hub connect string将具有以下格式:

Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;EntityPath=EVENTHUB_NAME

关于scala - 如何在 Spark Streaming 中验证 azure iot hub 的连接字符串?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51535274/

相关文章:

Scala/Hadoop : Compiler error when using classOf[CustomInputFormat] unless instance of CustomInputFormat exists

python - 如何将 xgboost 集成到 Spark 中? (Python)

java - 无法停止 Kerberos 调试日志记录

java - 使用 Datastax Spark Cassandra 连接器将 PairDStram 写入 cassandra

java - 如何访问 Spark Streaming 自定义接收器存储的元数据?

java - 在 scala 中组合 mongodb 编解码器注册表

scala - 使用Scala argonaut编码嵌套类

scala - 从 sbt 运行 Play 项目

apache-spark - 非 Databricks 平台上的 Spark Delta 格式

apache-spark - 重新启动 Spark 结构化流作业会消耗数百万条 Kafka 消息并死掉