我有一本 Antonios Chalkiopoulos 的 Programming MapReduce with Scalding。在书中,他讨论了 Scalding 代码的外部操作设计模式。你可以在他的网站上看到一个例子here .我选择使用 Type Safe API .自然地,这会带来新的挑战,但我更喜欢它而不是 Fields API,后者是我之前提到的书中和网站中大量讨论的内容。
我想知道人们是如何使用类型安全 API 实现外部操作模式的。我的初步实现如下:
I create a class that extends com.twitter.scalding.Job which will serve as my Scalding job class where I will 'manage arguments, define taps, and use external operations to construct data processing pipelines'.
I create an object where I define my functions to be used in the Type Safe pipes. Because the Type Safe pipes take as arguments a function, I can then just pass the functions in the object as arguments to the pipes.
这将创建如下所示的代码:
class MyJob(args: Args) extends Job(args) {
import MyOperations._
val input_path = args(MyJob.inputArgPath)
val output_path = args(MyJob.outputArgPath)
val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
case _ => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
}
val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
case m: HadoopMode => WritableSequenceFile[LongWritable, Text](output_path)
case _ => TypedTsv[(LongWritable, Text)](output_path)
}
val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.map(convertTextToEither).fork
validatedEvents.filter(isEvent).map(removeEitherWrapper).write(eventOutput)
}
object MyOperations {
def convertTextToEither(v: (LongWritable, Text)): (LongWritable, Either[Text, Event]) = {
...
}
def isEvent(v: (LongWritable, Either[Text, Event])): Boolean = {
...
}
def removeEitherWrapper(v: (LongWritable, Either[Text, Event])): (LongWritable, Text) = {
...
}
}
如您所见,传递给 Scalding Type Safe 操作的函数与作业本身保持分离。虽然这不像所呈现的外部操作模式那样“干净”,但这是编写此类代码的一种快速方法。此外,我可以使用 JUnitRunner 进行作业级集成测试,使用 ScalaTest 进行功能级单元测试。
这篇文章的主要目的是询问人们是如何做这种事情的?互联网上有关 Scalding Type Safe API 的文档很少。是否有更多功能性 Scala 友好的方法来执行此操作?我在这里缺少设计模式的关键组件吗?我对此感到有点紧张,因为使用 Fields API,您可以使用 ScaldingTest 在管道上编写单元测试。据我所知,您不能使用 TypedPipes 做到这一点。请让我知道是否有普遍认可的 Scalding 类型安全 API 模式,或者您如何创建可重用、模块化和可测试的类型安全 API 代码。感谢您的帮助!
Antonios 回复后更新 2
感谢您的回复。这基本上就是我一直在寻找的答案。我想继续谈话。正如我评论的那样,我在您的回答中看到的主要问题是此实现需要特定类型的实现,但是如果类型在您的整个工作中发生变化怎么办?我研究了这段代码,它似乎可以工作,但似乎被黑了。
def self: TypedPipe[Any]
def testingPipe: TypedPipe[(LongWritable, Text)] = self.map(
(firstVar: Any) => {
val tester = firstVar.asInstanceOf[(LongWritable, Text)]
(tester._1, tester._2)
}
)
这样做的好处是我声明了一个 self 的实现,但缺点是这种丑陋的类型转换。此外,我还没有使用更复杂的管道对此进行深入测试。所以基本上,您对如何处理类型变化有何看法,因为它们仅通过一个 self 实现来保持清洁/简洁?
最佳答案
斯卡拉 extension methods使用隐式类实现。 您向编译器添加了将 TypedPipe 转换为包含外部操作的(包装器)类的功能:
import com.twitter.scalding.TypedPipe
import com.twitter.scalding._
import cascading.flow.FlowDef
class MyJob(args: Args) extends Job(args) {
implicit class MyOperationsWrapper(val self: TypedPipe[Double]) extends MyOperations with Serializable
val pipe = TypedPipe.from(TypedTsv[Double](args("input")))
val result = pipe
.operation1
.operation2(x => x*2)
.write(TypedTsv[Double](args("output")))
}
trait MyOperations {
def self: TypedPipe[Double]
def operation1(implicit fd: FlowDef): TypedPipe[Double] =
self.map { x =>
println(s"Input: $x")
x / 100
}
def operation2(datafn:Double => Double)(implicit fd: FlowDef): TypedPipe[Double] =
self.map { x=>
val result = datafn(x)
println(s"Result: $result")
result
}
}
import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.conf.Configuration
object MyRunner extends App {
ToolRunner.run(new Configuration(), new Tool, (classOf[MyJob].getName :: "--local" ::
"--input" :: "doubles.tsv" ::
"--output":: "result.tsv" :: args.toList).toArray)
}
关于如何跨管道管理类型,我的建议是尝试找出一些有意义的基本类型和用例类。要使用您的示例,我会将方法 convertTextToEither
重命名为 extractEvents
:
case class LogInput(l : Long, text: Text)
case class Event(data: String)
def extractEvents( line : LogInput ): TypedPipe[Event] =
self.filter( isEvent(line) )
.map ( getEvent(line.text) )
那么你会有
LogInputOperations
用于LogInput
类型EventOperations
Event
类型
关于scala - Scalding TypedPipe API 外部操作模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34686515/