apache-spark - 如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?

标签 apache-spark pyspark jupyter-notebook avro

我已经为 Amazon EMR 集群配置了 1 个主节点和 2 个核心。以下是 EMR 上的软件安装:
Hive 2.3.4、Pig 0.17.0、Hue 4.3.0、Ganglia 3.7.2、Spark 2.4.0、TensorFlow 1.12.0。

我没有配置任何引导操作。现在,集群已启动并等待步骤。我已经从 EMR 开始使用 notebook,下面是代码的详细信息。

sdf = spark.read.csv('hdfs://i....:8020/user/root/temp.csv')

这完美地执行,我能够通过 sdf.show() 查看我的数据框

但是,当我尝试写入 avro 文件时,它失败了
sdf.write.format("avro").save("avro_file.avro")

呃:
u'Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'

我试过:
sdf.write.format("org.apache.spark.sql.avro").save("avro_file.avro")

给出了同样的错误
u'Failed to find data source: org.apache.spark.sql.avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Failed to find data source: org.apache.spark.sql.avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'

我还尝试通过 spark 交互式 session :
[ec2-user@ip-xxxx conf]$ sudo pyspark --packages org.apache.spark:spark-avro_2.12:2.4.2
Python 2.7.16 (default, Mar 18 2019, 18:38:44)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e8c82e1e-629a-4d83-844d-a86057fc5ae7;1.0
        confs: [default]
        found org.apache.spark#spark-avro_2.12;2.4.2 in central
        found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 209ms :: artifacts dl 6ms
        :: modules in use:
        org.apache.spark#spark-avro_2.12;2.4.2 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-e8c82e1e-629a-4d83-844d-a86057fc5ae7
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/6ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/05/02 07:23:00 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
19/05/02 07:23:03 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-avro_2.12-2.4.2.jar added multiple times to distributed cache.
19/05/02 07:23:03 WARN Client: Same path resource file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 2.7.16 (default, Mar 18 2019 18:38:44)
SparkSession available as 'spark'.
>>> df = spark.createDataFrame(
...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
...     ("id", "v"))
>>> df.write.format("avro").save("avro_file.avro")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 736, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o83.save.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
        at java.util.ServiceLoader.fail(ServiceLoader.java:232)
        at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
        at org.apache.spark.sql.avro.AvroFileFormat.<init>(AvroFileFormat.scala:44)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
        ... 24 more

>>>


我还尝试更新/etc/spark/conf/spark-defaults.conf 以具有
spark.jars.packages org.apache.spark:spark-avro_2.12:2.4.2, com.databricks:spark-csv_2.11:1.5.0

但是,发布此配置后,jupyter notebook 无法启动 spark 并出现以下错误:
The code failed because of a fatal error:
    Session 4 did not start up in 60 seconds..


Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.

最佳答案

在 Spark 2.4.3 :

将 spark_arvo 版本返回到 org.apache.spark:spark-avro_2.11:2.4.3 ,为我解决了这个问题。

另外,在您的 jupyter-notebook在启动 spark-context 之前添加以下行:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages   org.apache.spark:spark avro_2.11:2.4.3  pyspark-shell'

关于apache-spark - 如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55947670/

相关文章:

apache-spark - Spark - 读取和写入相同的 S3 位置

hadoop - 两个日期时间之间的 Spark / hive 时间

python - 如何在spark中设置驱动程序的python版本?

python - 将 Jupyter Notebook 导出为带有行号的 HTML?

python - ipython 笔记本不工作 : OSError: [Errno None not found] 2

scala - 在 Spark 2.x 中使用 Scala 2.12

apache-spark - 如何检查 yarn 日志应用程序 ID

python - 即使没有 maxDF 参数,如何将 maxDF 设置为 pyspark.ml.feature.CountVectorizer?

python - Jupyter 笔记本 : Updated packages don't update inside the notebook

scala - flatMap 编译错误发现 : TraversableOnce[String] required: TraversableOnce[String]