scala - Scalding TypedPipe API 外部操作模式

标签 scala hadoop design-patterns cascading scalding

我有一本 Antonios Chalkiopoulos 的 Programming MapReduce with Scalding。在书中,他讨论了 Scalding 代码的外部操作设计模式。你可以在他的网站上看到一个例子here .我选择使用 Type Safe API .自然地,这会带来新的挑战,但我更喜欢它而不是 Fields API,后者是我之前提到的书中和网站中大量讨论的内容。

我想知道人们是如何使用类型安全 API 实现外部操作模式的。我的初步实现如下:

I create a class that extends com.twitter.scalding.Job which will serve as my Scalding job class where I will 'manage arguments, define taps, and use external operations to construct data processing pipelines'.

I create an object where I define my functions to be used in the Type Safe pipes. Because the Type Safe pipes take as arguments a function, I can then just pass the functions in the object as arguments to the pipes.

这将创建如下所示的代码:

class MyJob(args: Args) extends Job(args) {

  import MyOperations._

  val input_path = args(MyJob.inputArgPath)
  val output_path = args(MyJob.outputArgPath)

  val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
    case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
    case _ => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
  }

  val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
    case m: HadoopMode => WritableSequenceFile[LongWritable, Text](output_path)
    case _ => TypedTsv[(LongWritable, Text)](output_path)
  }

  val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.map(convertTextToEither).fork
  validatedEvents.filter(isEvent).map(removeEitherWrapper).write(eventOutput)
}

object MyOperations {

  def convertTextToEither(v: (LongWritable, Text)): (LongWritable, Either[Text, Event]) = {
    ...
  }

  def isEvent(v: (LongWritable, Either[Text, Event])): Boolean = {
    ...
  }

  def removeEitherWrapper(v: (LongWritable, Either[Text, Event])): (LongWritable, Text) = {
    ...
  }
}

如您所见,传递给 Scalding Type Safe 操作的函数与作业本身保持分离。虽然这不像所呈现的外部操作模式那样“干净”,但这是编写此类代码的一种快速方法。此外,我可以使用 JUnitRunner 进行作业级集成测试,使用 ScalaTest 进行功能级单元测试。

这篇文章的主要目的是询问人们是如何做这种事情的?互联网上有关 Scalding Type Safe API 的文档很少。是否有更多功能性 Scala 友好的方法来执行此操作?我在这里缺少设计模式的关键组件吗?我对此感到有点紧张,因为使用 Fields API,您可以使用 ScaldingTest 在管道上编写单元测试。据我所知,您不能使用 TypedPipes 做到这一点。请让我知道是否有普遍认可的 Scalding 类型安全 API 模式,或者您如何创建可重用、模块化和可测试的类型安全 API 代码。感谢您的帮助!

Antonios 回复后更新 2

感谢您的回复。这基本上就是我一直在寻找的答案。我想继续谈话。正如我评论的那样,我在您的回答中看到的主要问题是此实现需要特定类型的实现,但是如果类型在您的整个工作中发生变化怎么办?我研究了这段代码,它似乎可以工作,但似乎被黑了。

def self: TypedPipe[Any]

def testingPipe: TypedPipe[(LongWritable, Text)] = self.map(
    (firstVar: Any) => {
        val tester = firstVar.asInstanceOf[(LongWritable, Text)]
        (tester._1, tester._2)
    }
)

这样做的好处是我声明了一个 self 的实现,但缺点是这种丑陋的类型转换。此外,我还没有使用更复杂的管道对此进行深入测试。所以基本上,您对如何处理类型变化有何看法,因为它们仅通过一个 self 实现来保持清洁/简洁?

最佳答案

斯卡拉 extension methods使用隐式类实现。 您向编译器添加了将 TypedPipe 转换为包含外部操作的(包装器)类的功能:

import com.twitter.scalding.TypedPipe
import com.twitter.scalding._
import cascading.flow.FlowDef

class MyJob(args: Args) extends Job(args) {

  implicit class MyOperationsWrapper(val self: TypedPipe[Double]) extends MyOperations with Serializable

  val pipe = TypedPipe.from(TypedTsv[Double](args("input")))

  val result = pipe
    .operation1
    .operation2(x => x*2)
    .write(TypedTsv[Double](args("output")))

}

trait MyOperations {

  def self: TypedPipe[Double]

  def operation1(implicit fd: FlowDef): TypedPipe[Double] =
    self.map { x =>
      println(s"Input: $x")
      x / 100
    }

  def operation2(datafn:Double => Double)(implicit fd: FlowDef): TypedPipe[Double] =
    self.map { x=>
      val result = datafn(x)
      println(s"Result: $result")
      result
    }

}

import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.conf.Configuration

object MyRunner extends App {

  ToolRunner.run(new Configuration(), new Tool, (classOf[MyJob].getName :: "--local" ::
    "--input" :: "doubles.tsv" ::
    "--output":: "result.tsv" :: args.toList).toArray)

}

关于如何跨管道管理类型,我的建议是尝试找出一些有意义的基本类型和用例类。要使用您的示例,我会将方法 convertTextToEither 重命名为 extractEvents :

case class LogInput(l : Long, text: Text)
case class Event(data: String)
def extractEvents( line : LogInput ): TypedPipe[Event] =
  self.filter( isEvent(line) )
      .map ( getEvent(line.text) ) 

那么你会有

  • LogInputOperations 用于 LogInput 类型
  • EventOperations Event 类型

关于scala - Scalding TypedPipe API 外部操作模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34686515/

相关文章:

list - 更快的置换生成器

hadoop - 在 hadoop 中编码为 UTF-8 文件

hadoop - 如何在AWS Athena中创建结构数组-关于 Parquet 数据的配置单元

Scala 和 mockito 测试 twilio MessageFactory.create() 方法

scala - 如何使用shapeless在scala中实现[x] -> x?

scala.tools.reflect.ToolBoxError 异常

hadoop - 将数据从 HDFS 复制到 Hive 中的外部表时出错

C++实时策略模式处理不同数据

asp.net-mvc-3 - 企业中业务实体验证的首选方法

oop - 糟糕的 OOP 有很多只有 1 或 2 个方法的类