在我的扩展工作中,我有如下代码:
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
class MyJob(args: Args) extends Job(args) {
FileInputFormat.setInputPathFilter(???, classOf[MyFilter])
// ... rest of job ...
}
class MyFilter extends PathFilter {
def accept(path:Path): Boolean = true
}
我的问题是
FileInputFormat.setInputPathFilter
方法的第一个参数需要为org.apache.hadoop.mapreduce.Job
类型。如何在扩展作业中访问Hadoop作业对象?
最佳答案
免责声明
无法提取Job
类。但是您可以(但永远不要这样做!)提取JobConf
。之后,您将能够使用来自mapreduce.v1 API(FileInputFormat.setInputPathFilter
)的org.apache.hadoop.mapred.JobConf
,该API允许存档过滤。
但我建议您不要这样做。阅读答案的结尾,
你该怎么做?
重写stepStrategy
的scalding.Job
方法以实现FlowStepStrategy
。例如,此实现允许更改mapreduce作业的名称
override def stepStrategy: Option[FlowStepStrategy[_]] = Some(new FlowStepStrategy[AnyRef]{
override def apply(flow: Flow[AnyRef], predecessorSteps: util.List[FlowStep[AnyRef]], step: FlowStep[AnyRef]): Unit =
step.getConfig match {
case conf: JobConf =>
# here you can modify the JobConf of each job.
conf.setJobName(...)
case _ =>
}
})
为什么不应该这样做?
只有在使用特定源的情况下,访问JobConf才能添加路径过滤,而在使用其他源时,它将中断。另外,您还将混合使用不同级别的抽象。而且我还没有开始您打算如何知道您实际上需要修改的JobConf(我看到的大多数烫手工作都是多步骤的)
一个人应该如何解决这个问题?
我建议您仔细查看所使用的
Source
类型。我很确定在Pipe
(或TypedPipe
)构造期间或之前,有一个函数可以在其中应用路径过滤。
关于hadoop - 是否可以从Scalding作业访问基础org.apache.hadoop.mapreduce.Job?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37048370/