python - 从 Databricks 笔记本向 Azure Eventhubs 发送 Spark 数据帧时出现错误 (java.lang.NoSuchMethodError)

标签 python azure pyspark azure-databricks azure-eventhub

我需要从我的 Databricks 笔记本将 pyspark Dataframe 发送到 Eventhub。问题发生在这部分代码:

ehWriteConf = {
  'eventhubs.connectionString' : EVENT_HUB_CONNECTION_STRING
}

def send_to_eventhub(df:DataFrame):
    ds = df.select(struct(*[c for c in df.columns]).alias("body"))\
      .select("body")\
      .write.format("eventhubs")\
      .options(**ehWriteConf)\
      .save()

我在对数据帧进行一些处理后调用此方法:

# write feature_df into our EventHub
send_to_eventhub(feature_df)

一些类似的问题表明这是一个库版本问题,因此我已经尝试了我找到的几个答案,例如安装以下库的兼容版本:

com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22

但这是我收到的错误消息:

java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-37526120346879> in <module>
      5 # write feature_df into our EventHub
      6 
----> 7 send_to_eventhub(feature_df)
      8 
      9 # implement reading data from EventHub through a loop in print statement

<command-2498519353602292> in send_to_eventhub(df)
     34     # .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")\
     35     # .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")
---> 36     ds = df.select(struct(*[c for c in df.columns]).alias("body"))\
     37       .select("body")\
     38       .write.format("eventhubs")\

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    736             self.format(format)
    737         if path is None:
--> 738             self._jwrite.save()
    739         else:
    740             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115     def deco(*a, **kw):
    116         try:
--> 117             return f(*a, **kw)
    118         except py4j.protocol.Py4JJavaError as e:
    119             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1187.save.
: java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V
    at org.apache.spark.sql.eventhubs.EventHubsWriter$.validateQuery(EventHubsWriter.scala:58)
    at org.apache.spark.sql.eventhubs.EventHubsWriter$.write(EventHubsWriter.scala:70)
    at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createRelation(EventHubsSourceProvider.scala:124)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:78)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:89)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:959)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:427)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

问题之一是不太清楚没有找到什么方法。

我运行笔记本的集群详细信息是:

Cluster details

最佳答案

要写入的数据帧需要具有以下架构:

Column                    |  Type
----------------------------------------------
body (required)           |  string or binary 
partitionId (*optional)   |  string 
partitionKey (*optional)  |  string

这对我有用。

df.withColumn('body', F.to_json(
       F.struct(*df.columns),
       options={"ignoreNullFields": False}))\
   .select('body')\
   .write\
   .format("eventhubs")\
   .options(**ehconf)\
   .save()

关于python - 从 Databricks 笔记本向 Azure Eventhubs 发送 Spark 数据帧时出现错误 (java.lang.NoSuchMethodError),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73962665/

相关文章:

python - 自连接后使用 UDF 的 Spark 2.0 过滤器

azure - Azure Web 作业自动重启故障排除

php - Telegram 通过 bot api 添加成员到 channel

python - distutils.util.split_quoted 和 shlex.split 之间有什么区别

python - 没有 session 的 Tensorflow eval() 或将变量移动到另一个 session

mongodb - 通过 Azure MongoDB 上数组元素内的部分字符串值查找文档

azure - 函数运行时无法启动。 Microsoft.Azure.ServiceBus : Value for the connection string parameter name '@Microsoft. KeyVault

apache-spark - Pyspark 计算组内 12 个月移动平均值

pyspark - 如何在 Pyspark 中对数据帧进行过采样?

python - 在 MultiIndex 上更改 Pandas 索引数据类型