我正在使用 Spark 1.1。 我有一个 Spark 作业,它只在存储桶下寻找特定模式的文件夹(即以...开头的文件夹),并且应该只处理那些。我通过执行以下操作实现了这一点:
FileSystem fs = FileSystem.get(new Configuration(true));
FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
List<FileStatus> statusList = Arrays.asList(statusArr);
List<String> pathsStr = convertFileStatusToPath(statusList);
JavaRDD<String> paths = sc.parallelize(pathsStr);
但是,当在 Google Cloud Storage 路径上运行此作业时:gs://rsync-1/2014_07_31*(使用最新的 Google Cloud Storage Connector 1.2.9),出现以下错误:
4/10/13 10:28:38 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/10/13 10:28:38 INFO util.Utils: Successfully started service 'Driver' on port 60379.
14/10/13 10:28:38 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@hadoop-w-9.c.taboola-qa-01.internal:45212/user/Worker
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.IllegalArgumentException: Wrong bucket: rsync-1, in path: gs://rsync-1/2014_07_31*, expected bucket: hadoop-config
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:100)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:294)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:457)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:163)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1052)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1027)
at com.doit.customer.dataconverter.Phase0.main(Phase0.java:578)
... 6 more
当我在本地文件夹上运行此作业时,一切正常。
hadoop-config 是我用来在 Google Compute Engine 上部署 Spark 集群的 bucket(使用 bdutil 0.35.2 工具)
最佳答案
简答题
而不是使用:
FileSystem fs = FileSystem.get(new Configuration(true));
FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
List<FileStatus> statusList = Arrays.asList(statusArr);
你需要做的
Path inputPathObj = new Path(inputPath);
FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
FileStatus[] statusArr = fs.globStatus(inputPathObj);
List<FileStatus> statusList = Arrays.asList(statusArr);
因为在 Hadoop 中,FileSystem 实例是基于 scheme
共享的和 authority
URI 的组成部分(以及更高级设置中的潜在用户组信息),并且此类实例在方案和权限之间不可互换。
长答案
这与 hostname
之间的区别有关和 path
URI
的组成部分[scheme]://[authority]/[path],这在 HDFS 用例中可能更明显,但也适用于 GCS。基本上有几个get
org.apache.hadoop.fs.FileSystem 中的方法,这里最适用的是:
public static FileSystem get(Configuration conf)
和
public static FileSystem get(URI uri, Configuration conf)
前者实际上只是调用后者:
return get(getDefaultUri(conf), conf);
哪里getDefaultUri(conf)
由 fs.default.name
定义或 fs.defaultFS
.第二个考虑因素是具有不同 hosthname
的文件系统。或 authority
组件被认为是本质上不同的文件系统;在 HDFS 的情况下,这是有道理的,因为:
FileSystem.get("hdfs://foo-cluster-namenode/", conf);
FileSystem.get("hdfs://bar-cluster-namenode/", conf);
每个点都在可能完全不同的文件系统实例上,在不同的集群上,允许在两个不同的 HDFS 实例上使用相同的路径名来引用不同的存储命名空间。尽管在机器的“主机名”方面不太透明,bucket
在 GCS 中确实扮演了 authority
的角色GCE URI 的组成部分——在 Hadoop 中,这意味着 FileSystem.get
当 bucket
时,从字面上返回相同的缓存 Java 文件系统对象。相同,但不同桶的实例不同。正如您不能创建 HDFS 实例并将其指向不同的权限一样:
// Can't mix authorities!
FileSystem.get("hdfs://foo/", conf).listStatus(new Path("hdfs://bar/"));
当你调用 FileSystem.get(conf)
你实际上得到了一个指向 gs://hadoop-config/
的缓存实例,然后用它来尝试列出 gs://rsync-1
.
相反,当您知道要操作的路径时,那应该是您获取 FileSystem 实例的时间:
FileSystem fs = FileSystem.get(myPath.toUri(), new Configuration(true));
fs.globStatus(myPath);
关于hadoop - 使用 globStatus 和 Google Cloud Storage 存储桶作为输入时无法运行 Spark 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26450708/