apache-spark - 从简单的json文件创建和显示spark数据框

标签 apache-spark dataframe

在本地模式下运行Spark时,以下简单的json DataFrame测试工作正常。这是Scala片段,但是我也成功地在Java和Python中运行了同样的东西:

sparkContext.addFile(jsonPath)

val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val dataFrame = sqlContext.jsonFile(jsonPath)
dataFrame.show()


我确保jsonPath在驱动程序端和工作程序端均可正常工作。我正在调用addFile ... json文件非常简单:

[{"age":21,"name":"abc"},{"age":30,"name":"def"},{"age":45,"name":"ghi"}]


当我退出本地模式并使用具有单个主设备/工人的单独的Spark服务器时,完全相同的代码将失败。我已经在Scala,Java和Python中尝试了相同的测试,以尝试找到可行的组合。它们基本上都得到相同的错误。以下错误来自Scala驱动程序,但Java / Python错误消息几乎相同:

15/04/17 18:05:26 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.0.2.15): java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747)
    at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033)
    at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
    at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
    at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
    at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)


这非常令人沮丧。我基本上是在尝试从官方文档中获取代码片段。

更新:谢谢保罗的深入答复。在执行相同步骤时出现错误。仅供参考,之前我使用的是驱动程序,因此名称为sparkContext,而不是shell的默认名称sc。这是一个简短的代码段,其中删除了过多的日志记录:

➜  spark-1.3.0  ./bin/spark-shell --master spark://172.28.128.3:7077
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Scala version 2.11.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val dataFrame = sqlContext.jsonFile("/private/var/userspark/test.json")
15/04/20 18:01:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.0.2.15): java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747)
    at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033)
    at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
    at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
    at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
    at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
    at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
    at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
    (...)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.0.2.15): java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747)

最佳答案

虽然我可以使您的简单示例正常工作,但我同意火花可能令人沮丧...

在这里,我有了使用openjdk 8从源代码构建的spark 1.3.0。

由于不同的原因,将文件与spark-shellspark-submit一起使用失败,与发布的代码相比,示例/文档可能已过时,并且需要稍作调整。

例如,在spark-shell中,sparkContext已经可以作为sc而不是作为sparkContext使用,并且有一个类似的预定义的sqlContext。 spark-shell发出INFO消息,宣布这些上下文的创建。

对于spark-submit,我遇到某种jar错误。那可能是本地问题。

无论如何,如果我缩短它,它运行良好。就执行此简短示例而言,json文件每行是否有一个对象似乎也无关紧要。对于将来的测试,生成一个大型示例并确定它是否跨内核并行运行以及是否需要每行一个对象(没有逗号或大括号)来完成此操作可能会很有用。

so1-works.sc

val dataFrame = sqlContext.jsonFile("/data/so1.json")
dataFrame.show()


输出,禁止显示INFO等消息...

paul@ki6cq:~/spark/spark-1.3.0$ ./bin/spark-shell --master spark://192.168.1.10:7077 <./so1-works.sc 2>/dev/null
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.8.0_40-internal)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> 

scala> val dataFrame = sqlContext.jsonFile("/data/so1.json")
dataFrame: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> dataFrame.show()
age name
21  abc 
30  def 
45  ghi 

scala> Stopping spark context.
paul@ki6cq:~/spark/spark-1.3.0$ paul@ki6cq:~/spark/spark-1.3.0$ 


奇怪的是:之后,我必须执行reset才能使我的Linux终端恢复正常。

好的,首先,像我一样尝试缩短示例。

如果仍不能解决问题,则可以尝试复制我的环境。

这可能很简单,因为我使用docker作为master和worker并将映像发布到公共dockerhub。

将来的读者注意:我的公共dockerhub图像不是用于产生火花的正式图像,并且可能会更改或删除。

您需要在家庭防火墙路由器设备(DLink,后置主机,DLink, Netgrear等)。我假设本地网络是192.168.1。*,192.168.1.10和.11是免费的,并且路由器可以正确路由,或者您知道如何使其正确路由。您可以在下面的运行脚本中更改这些地址。

如果只有一台计算机,由于桥接我在这里使用的网络方法的特殊性,可能无法正常工作以与主机进行通信。
可以使用它,但是比我想添加到已经很长的帖子中要多一些。

在一台Linux计算机上,install dockerpipework utility和这些Shell脚本(调整授予火花的内存,似乎不需要删除多余的工作程序):

./run-docker-spark

#!/bin/bash
sudo -v
MASTER=$(docker run --name="master" -h master --add-host master:192.168.1.10 --add-host spark1:192.168.1.11 --add-host spark2:192.168.1.12 --add-host spark3:192.168.1.13 --add-host spark4:192.168.1.14 --expose=1-65535 --env SPARK_MASTER_IP=192.168.1.10 -d drpaulbrewer/spark-master:latest)
sudo pipework eth0 $MASTER 192.168.1.10/24@192.168.1.1
SPARK1=$(docker run --name="spark1" -h spark1 --add-host master:192.168.1.10 --add-host spark1:192.168.1.11 --add-host spark2:192.168.1.12 --add-host spark3:192.168.1.13 --add-host spark4:192.168.1.14 --expose=1-65535 --env mem=10G --env master=spark://192.168.1.10:7077 -v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
sudo pipework eth0 $SPARK1 192.168.1.11/24@192.168.1.1


./stop-docker-spark

#!/bin/bash
docker kill master spark1
docker rm master spark1


另一台Linux计算机将是您的​​用户计算机,并且需要构建spark-1.3.0。在两台计算机上都创建一个/ data目录,并在其中安装json文件。然后,在充当将容纳主服务器和工作服务器的容器(例如VM)的组合主机的计算机上,仅运行一次./run-docker-spark。要停止火花系统,请使用停止脚本。如果重新启动或出现错误,则需要先运行stop脚本,然后运行脚本才能再次运行。

检查主服务器和工作服务器是否已在http://192.168.1.10:8080配对

如果是这样,那么您应该很好地尝试使用spark-shell命令行。

您不需要这些dockerfile,因为构建已发布在公共dockerhub上,并且下载是在docker run中自动进行的。但是在这里,它们是万一您想了解事物的构建方式,JDK,maven命令等的情况。

我从放置在名为spark-roasted-elephant的目录中的常见Dockerfile开始,因为这是一个非hadoop版本,而针对hadoop的O'Reilley书中有一个大象。您需要Spark网站上的spark-1.3.0源tarball才能放入Dockerfile目录。该Dockerfile可能未公开足够的端口(spark对其端口的使用非常混杂,而不幸的是docker被设计为包含并记录了端口的使用),并且在运行master和worker的shell脚本中覆盖了暴露。如果您要求docker列出正在运行的内容,这将导致某些不满意,因为其中包括端口列表。

paul@home:/Z/docker$ cat ./spark-roasted-elephant/Dockerfile
# Copyright 2015 Paul Brewer http://eaftc.com
# License: MIT
# this docker file builds a non-hadoop version of spark for standalone experimentation
# thanks to article at http://mbonaci.github.io/mbo-spark/ for tips
FROM ubuntu:15.04
MAINTAINER drpaulbrewer@eaftc.com
RUN adduser --disabled-password --home /spark spark
WORKDIR /spark
ADD spark-1.3.0.tgz /spark/ 
WORKDIR /spark/spark-1.3.0
RUN sed -e 's/archive.ubuntu.com/www.gtlib.gatech.edu\/pub/' /etc/apt/sources.list > /tmp/sources.list && mv /tmp/sources.list /etc/apt/sources.list
RUN apt-get update && apt-get --yes upgrade \
    && apt-get --yes install sed nano curl wget openjdk-8-jdk scala \
    && echo "JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64" >>/etc/environment \
    && export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" \
    && ./build/mvn -Phive -Phive-thriftserver -DskipTests clean package \
    && chown -R spark:spark /spark \
    && mkdir /var/run/sshd
EXPOSE 2222 4040 6066 7077 7777 8080 8081 


master是使用dockerfile和shell脚本从目录./spark-master构建的,并包含在容器中。这是dockerfile和shell脚本。

paul@home:/Z/docker$ cat ./spark-master/Dockerfile
FROM drpaulbrewer/spark-roasted-elephant:latest
MAINTAINER drpaulbrewer@eaftc.com
ADD my-spark-master.sh /spark/
USER spark
CMD /spark/my-spark-master.sh

paul@home:/Z/docker$ cat ./spark-master/my-spark-master.sh
#!/bin/bash -e
cd /spark/spark-1.3.0
# set SPARK_MASTER_IP to a net interface address, e.g. 192.168.1.10
export SPARK_MASTER_IP
./sbin/start-master.sh 
sleep 10000d


对于工人:

paul@home:/Z/docker$ cat ./spark-worker/Dockerfile
FROM drpaulbrewer/spark-roasted-elephant:latest
MAINTAINER drpaulbrewer@eaftc.com
ADD my-spark-worker.sh /spark/
CMD /spark/my-spark-worker.sh
paul@home:/Z/docker$ cat ./spark-worker/my-spark-worker.sh
#!/bin/bash -e
cd /spark/spark-1.3.0
sleep 10
# dont use ./sbin/start-slave.sh it wont take numeric URL
mkdir -p /Z/data
mkdir -p /user/hive/warehouse
chown -R spark:spark /user
su -c "cd /spark/spark-1.3.0 && ./bin/spark-class org.apache.spark.deploy.worker.Worker --memory $mem $master" spark


尽管到目前为止,这篇文章已经变成了“我如何使Dockerfile成为火花?”的答案。它不是故意的。这些Dockerfile对我来说是实验性的,我不在生产中使用它们,也不保证其质量。我不喜欢在Docker上广为人知的Spark来源,因为他们从事了将一堆容器链接在一起的懒惰实践,而且它庞大而且花了很多时间才能下载。在这里,层数少得多,下载量也较小。发布的内容不只是docker示例,而是您可以确定自己环境中的不同之处。

关于apache-spark - 从简单的json文件创建和显示spark数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29710932/

相关文章:

python - 使用列表中的 startswith 的 Pyspark 过滤器

python - Impala 查询在 Pyspark 中返回错误结果

apache-spark - 坚持在内存中无法在 Spark 中工作

python - 选择特征后打印列/变量名称

R:将xts或zoo对象转换为数据框

python - 循环遍历数据框中的数组并按组绘制

apache-spark - spark-submit --file hdfs://file被缓存在驱动程序的/tmp中

apache-spark - 如何将 hadoop conf 目录指定为 conf 属性以启动应用程序

json - 将 JSON 文件转换为 data.frame

python - Pandas merge_asof 问题 : one-to-multiple merge