scala - saveAsCassandraTable 不是 ....dstream.DStream[(String, Int, String)] 的成员

标签 scala apache-spark cassandra

当我尝试使用 saveAsCassandraTable 将实时推文加载到 Cassandra 时,出现以下错误

值 saveAsCassandraTable 不是 org.apache.spark.streaming.dstream.DStream[(String, Int, String)] 的成员

不过,我能够使用 saveToCassandra 成功导出。到目前为止我可以组合的代码如下 -

val tags = stream.map(_.getText).filter(_.startsWith("@xyz"))     
val Counts  =   tags.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowLength, slideInterval)    
val CountsAll   =   Counts.map{case (tag,   counter) => (tag,   counter, "Everything")}    
CountsAll.saveAsCassandraTable("demo1", "tags1")

我的要求是 scala 同时将推文加载到 Cassandra 中的两个不同表中。 一种是 Cassandra 中的预定义表,另一种是在每次加载推文时动态创建一个表。推文以 2000 秒的间隔加载。

感谢有关如何执行saveAsCassandraTable的任何建议

最佳答案

就像@user6910411所说,您的错误告诉您您正在尝试在DStream上调用saveAsCassandraTable。 您只能在 RDD[T] 上使用它,在您的情况下RDD[(String, Int, String)]

你想要这样的东西:

CountsAll.foreachRDD{ rdd =>
   rdd.saveAsCassandraTable("demo1", "tags1")
}

更新

RDD API不支持截断等。 您可能应该考虑在 Spark 代码路径之外管理架构。 但是,要直接解决您的要求:

val conn: CassandraConnector = CassandraConnector(sparkConf())
conn.withSessionDo { session =>
      session.execute(s"""CREATE TABLE tags1..."")
      }
    }

其中 sparkConf() 返回带有 Cassandra 设置的 Spark 配置对象。 .withSessionDo 为您提供来自 DataStax C* 驱动程序的 Session 对象,以便您可以用它做任何您想做的事情。 再次强调,我建议架构管理不要直接在 Spark 代码路径上完成,而是独立管理。

关于scala - saveAsCassandraTable 不是 ....dstream.DStream[(String, Int, String)] 的成员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43687739/

相关文章:

hadoop - 为什么 Guava 在我的 build.sbt 中没有正确着色?

scala - 如何在 Spark Udf 中传递 map ?

scala - 如何合并或跳过 Scala Actor 中的重复消息?

apache-spark - Pyspark ML - 如何保存管道和 RandomForestClassificationModel

java - 有没有办法将xml文件数据插入cassandra?

Cassandra json2sstable 空指针异常

java - Guice 在 scala 中注入(inject)注释

scala - 如何将任意数字转换为 double ?

java - Spark Java 使用数学运算来获取具有最大截止值的值比例

mysql - Spark 应用程序中的 Hive 中的 "Unable to alter partition"