scala - Spark 管示例

标签 scala streaming pipe apache-spark

我是 Spark 的新手,正在尝试了解管道方法的工作原理。我在 Scala 中有以下代码

sc.textFile(hdfsLocation).pipe("preprocess.py").saveAsTextFile(hdfsPreprocessedLocation)

hdfsLocation 和 hdfsPreprocessedLocation 的值很好。作为证明,以下代码可从命令行运行

hadoop fs -cat hdfsLocation/* | ./preprocess.py | head

当我运行上面的 Spark 代码时,出现以下错误

14/11/25 09:41:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Cannot run program "preprocess.py": error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1041)
at org.apache.spark.rdd.PipedRDD.compute(PipedRDD.scala:119)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:135)
at java.lang.ProcessImpl.start(ProcessImpl.java:130)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1022)
... 12 more

为了解决 Hadoop 流的这个问题,我只使用 --files 属性,所以我对 Spark 尝试了同样的事情。我使用以下命令启动 Spark

bin/spark-shell --files ./preprocess.py

但这给出了同样的错误。

我找不到通过管道将 Spark 与外部进程一起使用的好示例,因此我不确定我是否正确地执行了此操作。任何帮助将不胜感激。

谢谢

最佳答案

我不确定这是否是正确的答案,所以我不会最终确定,但在本地和集群模式下运行 spark 时,文件路径似乎不同。在不使用 --master 的情况下运行 spark 时,管道命令的路径是相对于本地计算机的。当使用 --master 运行 spark 时,管道命令的路径是 ./

更新: 这其实是不正确的。我正在使用 SparkFiles.get() 来获取文件名。事实证明,当在 RDD 上调用 .pipe() 时,命令字符串在驱动程序上进行评估,然后传递给 worker。因为这个 SparkFiles.get() 不是获取文件名的合适方法。文件名应该是 ./因为 SparkContext.addFile() 应该将该文件放在 ./相对于每个工作人员运行的位置。但是我现在对 .pipe 很不满意,以至于我已经从我的代码中完全删除了 .pipe,转而使用 .mapPartitions 结合我编写的 PipeUtils 对象 here .这实际上更有效,因为我只需为每个分区承担一次脚本启动成本,而不是每个示例一次。

关于scala - Spark 管示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27133482/

相关文章:

scala - 如何将当前行的值与下一个相除?

scala - SBT 未解决 Squeryl 依赖关系

video - 如果嵌入YouTube实时流视频,是否会增加服务器负载?

Linux 所有输出到文件

bash - FFmpeg 通过文件输入计算出 mp3 持续时间,但使用管道失败?

java - 在 JSoup 文档中获取所有节点(递归)的最快方法

java - 如何增加java/scala中的线程分布?

android - MediaPlayer,只有视频 m3u8 HTML 流工作

nginx - 使用 NGINX 调整数据传输成本

python - 删除 Unix/bash 中与列条件匹配的行