apache-spark - 如何使用Databricks S3-SQS连接器读取结构化流中的SQS消息?

标签 apache-spark amazon-sqs spark-structured-streaming

我正在尝试使用以下代码使用 Spark Streaming 读取来自 sqs 的消息

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

val df = spark.readStream.format("s3-sqs").option("queueUrl", "https://sqs.us-east-1.amazonaws.com/XXXX").option("region","us-east-1").option("awsAccessKey","xxxxx").option("fileFormat", "json").option("sqsFetchInterval", "1m") .load()


spark2-shell --jars /jars_aws/hadoop-aws-2.7.3.jar,/jars_aws/aws-java-sdk-1.11.582.jar,/jars_aws/aws-java-sdk-s3-1.11.584.jar,/jars_aws/aws-java-sdk-sqs-1.11.584.jar

我遇到了异常,说 ClassNotFound 异常

java.lang.ClassNotFoundException: Failed to find data source: s3-sqs. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  ... 53 elided
Caused by: java.lang.ClassNotFoundException: s3-sqs.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
  ... 54 more

请帮忙

添加了所需的 jar

最佳答案

该错误表明 --jars 中没有 jar 具有 s3-sqs 数据源所需的类。

经过一番谷歌搜索和阅读后Optimized S3 File Source with SQS (这似乎是官方文档)我认为 s3-sqs 数据源(又名 Databricks S3-SQS 连接器)是 Databricks Runtime (DBR) 和 Databricks 特定的一部分。

换句话说,我认为该连接器仅在 Databricks 笔记本中可用,似乎无法在外部使用它。

关于apache-spark - 如何使用Databricks S3-SQS连接器读取结构化流中的SQS消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57326658/

相关文章:

performance - 如何调出内存异常 Spark

python - delete_message_batch 并没有真正从 SQS 队列中删除消息

python - 使用结构化流(PySpark)运行链式查询

apache-spark - 如何调用从 Spark 作业调用的 Web 服务?

apache-spark - 找不到 SparkSQL key : scale

scala - 在 org.apache.spark.sql.Column 中使用 rlike

apache-spark - 运行 Spark 作业的副作用是否有意义?

linux - 如何绕过 Linux "Too Many Arguments"限制

java - spring boot中多个配置属性@Autowired其中一个属性为null

apache-spark - 如何在一个微批中设置最大行数?