r - Sparklyr:使用调用方法列出 R 中目录的内容

标签 r apache-spark sparklyr

无法找到用于通过 Spark 列出目录内容的内置 sparklyr,我正在尝试使用 invoke:

sc <- spark_connect(master = "yarn", config=config)
path <- 'gs:// ***path to bucket on google cloud*** '
spath <- sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', path) 
fs <- sparklyr::invoke(spath, 'getFileSystem')
list <- sparklyr:: invoke(fs, 'listLocatedStatus') 
Error: java.lang.Exception: No matched method found for class org.apache.hadoop.fs.Path.getFileSystem
    at sparklyr.Invoke.invoke(invoke.scala:134)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66) ...

注意:是否有分布式代码可重现示例的指南?考虑到我正在针对特定的 Spark 环境运行,我不知道如何举一个其他人可以效仿的例子。

最佳答案

getFileSystem 方法 takes org.apache.hadoop.conf.Configuration 对象作为第一个参数:

public FileSystem getFileSystem(Configuration conf)
                     throws IOException

Return the FileSystem that owns this Path.

Parameters:

conf - the configuration to use when resolving the FileSystem

因此检索 FileSystem 实例的代码应该大致如下所示:

# Retrieve Spark's Hadoop configuration
hconf <- sc %>% spark_context() %>% invoke("hadoopConfiguration")
fs <- sparklyr::invoke(spath, 'getFileSystem', hconf)

另外 listLocatedStatus takes either Path

public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                     throws FileNotFoundException,
                                                                            IOException

Path and PathFilter (注意这个实现是 protected):

public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                    throws FileNotFoundException,
                                                                            IOException

所以如果你想像上面那样构建你的代码,你必须至少提供一个路径

sparklyr:: invoke(fs, "listLocatedStatus", spath)

在实践中,直接获取 FileSystem 可能更容易:

fs <- invoke_static(sc, "org.apache.hadoop.fs.FileSystem", "get",  hconf)

并使用globStatus

lls <- invoke(fs, "globStatus", spath)

其中 spath 是带有通配符的路径,例如:

sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', "/some/path/*")

结果将是一个 R list,可以很容易地对其进行迭代:

lls  %>%
    purrr::map(function(x) invoke(x, "getPath") %>% invoke("toString"))

致谢:

The answerHow can one list all csv files in an HDFS location within the Spark Scala shell?通过 @jaime

注意事项:

  • 一般来说,如果您与重要的 Java API 交互,用 Java 或 Scala 编写代码并提供最小的 R 接口(interface)会更有意义。
  • 对于与特定文件对象存储的交互,使用专用包可能更容易。对于 Google 云存储,您可以查看 googleCloudStorageR .

关于r - Sparklyr:使用调用方法列出 R 中目录的内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52931726/

相关文章:

r - sparklyr - rsparkling as_h2o_frame() 错误 java.lang.IllegalArgumentException : Unsupported argument: (spark. dynamicAllocation.enabled,true)

r - databricks 上的 sql sparklyr sparkr 数据帧转换

python - Reticulate 无法安装 python 包

r - 计算 R 中的小计

apache-spark - Spark Streaming Kafka - 如何在处理所有现有消息后停止流式传输(优雅地)

apache-spark - 如何在 Spark 中生成大字数文件?

java - Spark : Java API filter() shows nothing, 结果与 Spark-shell 不同

R:使用组引用计算行中值之间的差异

python - 基本偏度公式、Python 和 R 之间的不一致偏度结果

r - 如何将变量传递给spark_apply()中调用的函数?