apache-spark - Spark 2.4 中从 Amazon Redshift 读取数据

标签 apache-spark pyspark amazon-emr

我们曾经在Spark 2.3中使用databricks读取数据,代码段如下 Spark-Shell 初始化:

spark-shell --jars RedshiftJDBC42-1.2.10.1009.jar --packages com.databricks:spark-redshift_2.11:3.0.0-preview1,com.databricks:spark-avro_2.11:3.2.0

然后

val url = "jdbc:redshift://cluster-link?user=username&password=password"
val queryFinal = "select count(*) as cnt from table1"
val df = spark.read.format("com.databricks.spark.redshift").option("url", url).option("tempdir", "s3n://temp-bucket/").option("query",queryFinal).option("forward_spark_s3_credentials", "true").load().cache

最近升级了 Spark 2.4,我们无法执行此操作,并且出现以下异常

java.lang.AbstractMethodError: com.databricks.spark.redshift.RedshiftFileFormat.supportDataType(Lorg/apache/spark/sql/types/DataType;Z)Z
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:48)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:47)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:47)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyReadSchema(DataSourceUtils.scala:39)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:400)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:168)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:326)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:325)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:403)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:321)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:289)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:746)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:705)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:714)

我查看了在线论坛,了解到Spark 2.4已经添加了内置的Avro源,这就是使用databricks时我们无法反序列化数据的原因。

我尝试了两种方法:

  1. spark.sql.legacy.replaceDatabricksSparkAvro.enabled 设置为 true

https://spark.apache.org/docs/latest/sql-data-sources-avro.html

这里的异常保持不变。

  • 使用 JDBC URL 连接 https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html 我的连接超时。
  • 有谁知道这个问题有解决办法吗?这真的很有帮助。

    最佳答案

    正如issue中所说从 databricks Spark-Redshift 连接器开始,该库不再作为单独的项目进行维护,因此,它不支持 Spark 2.4.x

    如果您想继续将 Redshift 与 Spark 2.4.x 一起使用,还有一个替代方案:Udemy fork 。这样,您必须将 Avro 依赖项("org.apache.spark"%% "spark-avro",包含在 Spark 2.4.0 版本中)添加为依赖项文件中的“provided”并在 spark-submit 命令中添加选项 --packages org.apache.spark:spark-avro_2.12:2.4.3,如 Avro Documentation 中所述。 .

    关于apache-spark - Spark 2.4 中从 Amazon Redshift 读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55736327/

    相关文章:

    apache-spark - 如何将来自kafka的directstream转换为spark 1.3.0中的数据帧

    java - Apache Spark - JavaSparkContext 无法转换为 SparkContext 错误

    apache-spark - pyspark - 将纪元时间转换为日期

    hadoop - YARN 上的 Apache Spark : Large number of input data files (combine multiple input files in spark)

    apache-spark - 查找 Hive/Impala 表的压缩详细信息

    python - 如何根据 PySpark 中的条件修改行子集

    apache-spark - UDF 将单词映射到 Spark 中的术语索引

    amazon-web-services - 如何在同一个 AWS EMR 集群中同时运行 Spark 作业?

    aws-lambda - 使用 Lambda 启动和拆除集群

    hadoop - 无法在S3位置的Shark for Hive表中恢复分区