hadoop - 是否可以从Scalding作业访问基础org.apache.hadoop.mapreduce.Job?

标签 hadoop amazon-web-services elastic-map-reduce cascading scalding

在我的扩展工作中,我有如下代码:

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

class MyJob(args: Args) extends Job(args) {
  FileInputFormat.setInputPathFilter(???, classOf[MyFilter])
  // ... rest of job ...
}

class MyFilter extends PathFilter {
  def accept(path:Path): Boolean = true
}

我的问题是FileInputFormat.setInputPathFilter方法的第一个参数需要为org.apache.hadoop.mapreduce.Job类型。如何在扩展作业中访问Hadoop作业对象?

最佳答案

免责声明

无法提取Job类。但是您可以(但永远不要这样做!)提取JobConf。之后,您将能够使用来自mapreduce.v1 API(FileInputFormat.setInputPathFilter)的org.apache.hadoop.mapred.JobConf,该API允许存档过滤。

但我建议您不要这样做。阅读答案的结尾,

你该怎么做?

重写stepStrategyscalding.Job方法以实现FlowStepStrategy。例如,此实现允许更改mapreduce作业的名称

override def stepStrategy: Option[FlowStepStrategy[_]] = Some(new FlowStepStrategy[AnyRef]{
  override def apply(flow: Flow[AnyRef], predecessorSteps: util.List[FlowStep[AnyRef]], step: FlowStep[AnyRef]): Unit =
    step.getConfig match {
      case conf: JobConf =>
        # here you can modify the JobConf of each job.
        conf.setJobName(...)
      case _ =>
    }
})

为什么不应该这样做?

只有在使用特定源的情况下,访问JobConf才能添加路径过滤,而在使用其他源时,它将中断。另外,您还将混合使用不同级别的抽象。而且我还没有开始您打算如何知道您实际上需要修改的JobConf(我看到的大多数烫手工作都是多步骤的)

一个人应该如何解决这个问题?

我建议您仔细查看所使用的Source类型。我很确定在Pipe(或TypedPipe)构造期间或之前,有一个函数可以在其中应用路径过滤。

关于hadoop - 是否可以从Scalding作业访问基础org.apache.hadoop.mapreduce.Job?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37048370/

相关文章:

hadoop - hive 开始失败

amazon-web-services - Cloudformation 蓝/绿部署 HealthCheckGracePeriodSeconds

amazon-web-services - CloudFormation 将变量从一个堆栈传递到另一个堆栈

hadoop - 从 Hadoop 中删除文件/文件夹

amazon-s3 - 使用 S3DistCp 将文件从 S3 复制到 EMR

java - 使用 Java 以独立模式运行 Spark

hadoop - 了解 htfp url 和 hdfs 路径之间的区别

python - Pydoop 卡在 HDFS 文件的 readline 上

hadoop - Cascalog可以链接到外部Hadoop集群吗?

amazon-web-services - 使用glueContext.write_dynamic_frame.from_options 的AWS Glue 导出到 Parquet 问题