scala - 如何将 Scalding TypedPipe 转换为 Iterator

标签 scala hadoop iterator pipe scalding

在我的 Scalding hadoop 作业中,我在管道上有一些分组逻辑,然后我需要处理每个组:

val georecs : TypedPipe[GeoRecord] = getRecords

georecs.map( r => (getRegion(r),r) )
  .groupBy(_._1)
  .mapValueStream( xs => clusterRecords(xs) )
  .values
  .write(out)

在 clusterRecords 内部,我需要将传入的迭代器转换为 TypedPipe,以便我可以 1) 对其进行采样和 2) 取叉积:

//turn the iterator to a pipe so we can sample it    
    val sample = TypedPipe.from( xs.map( x => Centroid(x._2.coreActivity)).toIterable)
    .sample(0.11)
    .distinct

//turn the iterator to a pipe so we can take its cross product
val records : TypedPipe[GeoRecord] = TypedPipe.from(xs.map(_._2).toIterable)

records
  .cross(sample)   //cartesian product of records and centroids
  .groupBy( _._2)  // group By the user record so we get a list of pairs (user, centroid)
  .minBy( x => score( x._1.coreActivity, x._2.core) ) //find the centroid with the lowest score for each Record
  .values
  .groupBy( x => x._2 )   //now groupBy centroid to get the clusters
  .values

问题是 mapValueStream 期望映射函数返回一个迭代器,但我拥有的是一个 TypedPipe。我知道如何将迭代器变成管道,但反之则不然。我是否需要执行它,将其写入磁盘,然后再读回?

如果是这样,实现该目标的最佳方法是什么?

最佳答案

看起来您可以通过运行将管道转换为迭代器。这可以像这样完成:

val georecs : TypedPipe[GeoRecord] = getRecords

val i : Iterator[GeoRecord] = georecs
  .toIterableExecution
  .waitFor(this.scaldingConfig,this.mode)
  .get
  .toIterator

(类型检查,但尚未测试)

关于scala - 如何将 Scalding TypedPipe 转换为 Iterator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32507110/

相关文章:

hadoop - 使用 FILE channel 配置 flume 的多个源时出现 channel 锁定错误

multithreading - future 是否在单个线程上执行? (斯卡拉)

java - 链接 Map Reduce 作业时出错

apache-spark - 如何从独立的 Spark 集群访问 azure block 文件系统 (abfss)

rust - 为什么允许我对迭代器的可变引用调用 take() ?

c++ - GCC 标准库实现中(void)强制转换的目的是什么?

c# - GetIterator() 和迭代器模式

scala - Spark : understanding the DAG and forcing transformations

string - Unicode转义中的Scala原始字符串错误

scala - 在哪里下载 Scala RPM 或 DEB 包?