scala - 将 cassandra 行映射到 Spark RDD 中的参数化类型

标签 scala apache-spark cassandra spark-cassandra-connector

我正在尝试使用 spark-cassandra-connector 将 cassandra 行映射到参数化类型。我一直在尝试使用隐式定义的 columnMapper 来定义映射,因此:

class Foo[T<:Bar:ClassTag:RowReaderFactory] {
  implicit object Mapper extends JavaBeanColumnMapper[T](
    Map("id" -> "id",
        "timestamp" -> "ts"))

  def doSomeStuff(operations: CassandraTableScanRDD[T]): Unit = {
    println("do some stuff here")
  }
}

但是,我遇到了以下错误,我认为这是由于我传递了 RowReaderFactory并且没有正确指定 RowReaderFactory 的映射。知道如何指定 RowReaderFactory 的映射信息吗? ?

Exception in thread "main" java.lang.IllegalArgumentException: Failed to map constructor parameter timestamp in Bar to a column of MyNamespace
    at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$4$$anonfun$apply$1.apply(DefaultColumnMapper.scala:78)
    at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$4$$anonfun$apply$1.apply(DefaultColumnMapper.scala:78)
    at scala.Option.getOrElse(Option.scala:120)
    at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$4.apply(DefaultColumnMapper.scala:78)
    at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$4.apply(DefaultColumnMapper.scala:76)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForReading(DefaultColumnMapper.scala:76)
    at com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:56)
    at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:23)
    at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:48)
    at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:43)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:147)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)

最佳答案

您可以在 Foo 的伴生对象中定义该隐式,如下所示:

object Foo {
  implicit object Mapper extends JavaBeanColumnMapper[T](
    Map("id" -> "id",
        "timestamp" -> "ts"))
}

当 Scala 尝试查找某个类的隐式实例时,它会查找该类的伴生对象。如果需要,您可以在需要隐式的范围内定义它,但您可能希望添加伴生对象,这样您就不需要在需要时重复它。

关于scala - 将 cassandra 行映射到 Spark RDD 中的参数化类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38154437/

相关文章:

regex - _ * 用法是scala模式匹配

scala - 字符串差异作为列表

apache-spark - avro 类型的 createDataFrame 中的无限递归

linux - 为什么需要在 Datastax Cassandra 中关闭交换?

mongodb - Cassandra 中的无主模型与 MongoDB 中的主从模型?

sql-server - 什么时候 ACID 不重要?

scala - 为什么 Scala 不能将此匹配优化为开关?

scala - Spark 和预测 IO : NoClassDefFoundError Despite Dependency Existing

apache-spark - Apache Spark DAG 行为联合分组操作

java - 映射目录时 sbt-native-packager 失败并显示 "Too many open files in system"