scala - Apache Spark : dealing with Option/Some/None in RDDs

标签 scala exception apache-spark scala-option

我正在映射 HBase 表,为每个 HBase 行生成一个 RDD 元素。但是,有时该行有错误数据(在解析代码中抛出 NullPointerException),在这种情况下我只想跳过它。

我的初始映射器返回 Option表示它返回 0 或 1 个元素,然后过滤 Some ,然后获取包含的值:

// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
  map( tuple => getData(tuple._2) ).
  filter( {case Some(y) => true; case None => false} ).
  map( _.get ).
  // ... more RDD operations with the good data

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  try {
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    Some( ( id, ( List(x),
          // more stuff ...
        ) ) )
  } catch {
    case e: NullPointerException => {
      logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
      None
    }
  }
}

有没有更惯用的方法来做到这一点更短?我觉得这看起来很乱,都在getData()并在 map.filter.map我正在跳舞。

也许是 flatMap可以工作(在 Seq 中生成 0 或 1 个项目),但我不希望它展平我在 map 函数中创建的元组,只需消除空。

最佳答案

另一种通常被忽视的方法是使用 collect(PartialFunction pf) ,这意味着“选择”或“收集”RDD中在部分函数中定义的特定元素。

代码如下所示:

val output = myRDD.collect{case Success(tuple) => tuple }

def getData(r: Result):Try[(String, List[X])] = Try {
        val id = Bytes.toString(key, 0, 11)
        val x = Long.MaxValue - Bytes.toLong(key, 11)
        (id, List(x))
}

关于scala - Apache Spark : dealing with Option/Some/None in RDDs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29103655/

相关文章:

java - JavaSparkContext.wholeTextFiles 的数据集 API 模拟

Python:ValueError 和 Exception 之间的区别?

Java 嵌套 Map 到 Scala 嵌套序列

scala - 如何让 sbt 在本地 Maven 存储库中查找 scala dist?

scala - 使用 MockFactory 模拟配置对象

java - 如何将此 ArrayList 加载到 JTable 中

c# - 无法访问 Azure keyvault 值

hadoop - Spark 将本地文件从主节点分发到节点

apache-spark - 按列保护 Parquet 文件

scala - 避免 Java MPI 绑定(bind)中的命名空间冲突