apache-spark - 以编程方式将 Databricks spark-csv 添加到 Spark 1.6.2 客户端

标签 apache-spark pyspark apache-spark-sql databricks

我正在使用 Spark 1.6.2,开发一个 Python Spark 客户端(它以 yarn-client 模式运行)。这里重要的是,始终在客户端计算机中,我无法通过 Spark 提交我的 Python 脚本,但我需要将该脚本作为 Python 脚本运行

在代码的某个点,我需要在 HDFS 中加载 CSV 文件作为 Spark Dataframe(即使用 SQL 上下文)。如您所知,Spark 1.6.2 没有对基于 CSV 的数据帧的 native 支持,并且 Databricks spark-csv必须使用。

数据加载语句如下:

df = sql_context.read.load(format='com.databricks.spark.csv', path=url, header=True, inferSchema=False, delimiter=',')

问题是找不到com.databricks.spark.csv

我知道必须下载 Databricks Spark-csv jar 并将其放在某个地方。问题是:在哪里?这是客户端计算机或集群中的要求吗?

因为我不知道,所以我在客户端计算机上尝试过此操作,但没有成功:

  • 导出 PYTHONPATH=/path/where/jars/were/downloaded/
  • conf = SparkConf().set('spark.jars', '/path/where/jars/were/downloaded/')
  • conf = SparkConf().set('spark.driver.extraClassPath', '/path/where/jars/were/downloaded/')。 [ref ]

我也在 Spark 集群上尝试过这个,但也没有成功:

  • 通过 Ambari 设置自定义 Spark-defaults 属性 spark.jars
  • 通过 Ambari 设置自定义 Spark-defaults 属性 spark.driver.extraClassPath

我会记住,--jars--packages 等命令行选项不适合我,因为我没有运行任何 Spark 脚本: )

其他解决方案,例如使用 addJar() 在 Spark 上下文中设置 jar 将不起作用,因为 Spark 1.6.2 没有实现它。

那么,知道我的代码如何找到 Databricks Spark-csv jar 吗?

以防万一,这是错误跟踪:

java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv. Please find packages at http://spark-packages.org
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:77)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:102)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.databricks.spark.csv.DefaultSource
        at java.net.URLClassLoader$1.run(URLClassLoader.java:359)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:348)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:347)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
        at scala.util.Try.orElse(Try.scala:82)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:62)
        ... 14 more

我读过的其他帖子...

提前致谢。

最佳答案

最后,我找到了this Databricks Github 上的问题,@drorata 的答案对我有用:

export PACKAGES="com.databricks:spark-csv_2.11:1.3.0"
export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"

通过导出上述环境变量,Databrick Spark-csv 包(和依赖项)下载到我的本地 .ivy2 文件夹,并在创建 Spark 上下文时自动上传到集群。

关于apache-spark - 以编程方式将 Databricks spark-csv 添加到 Spark 1.6.2 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47471534/

相关文章:

scala - 如何在 Spark-Submit 应用程序中执行 S3-dist-cp 命令

斯卡拉 Spark : How to create a RDD from a list of string and convert to DataFrame

apache-spark - 如何使用 PySpark 对 Delta 文件的分区动态执行插入覆盖?

pyspark - 如何将 PySpark 数据框写入 DynamoDB 表?

apache-spark - 加速实验,限制Spark为单核

apache-spark - Spark 重新分区落入单个分区

pyspark - Delta Time Travel 带有 SQL 错误无关输入 '0' 期望 {<EOF>, ';' }(第 1 行,位置 38)

python - 用 pyspark 替换数据框中一列的所有值

python - Spark : What's the difference between spark. sql 和 sqlCtx.sql

dataframe - pyspark中有没有一种方法可以计算唯一值