java - 使用 Datastax Spark Cassandra 连接器将 PairDStram 写入 cassandra

标签 java cassandra apache-spark spark-streaming

我需要使用 Java 和 Datastax Spark Cassandra Connector 将过滤流的数据写入 cassandra。

我关注了 datastax java documentation .

文档解释了如何将 RDD 写入 cassandra,但没有说明如何写入 Dstream。

我需要能够保存一个 PairDStream,但我不知道该怎么做,因为所有示例都是用 scala 编写的。

我需要把下面用scala写的代码转成java的代码:

val wc = stream.flatMap(_.split("\\s+"))
    .map(x => (x, 1))
    .reduceByKey(_ + _)
    .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) 

基本上我必须保存一个JavaPairDStream<String, Integer>

最佳答案

解决方案,以防万一有人在寻找答案

要将DStream或JavaDStream写入Cassandra,需要导入:

import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;

并使用javaFunctions(DStream<T> arg0)javaFunctions(JavaDStream<T> arg0)

关于java - 使用 Datastax Spark Cassandra 连接器将 PairDStram 写入 cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29173117/

相关文章:

apache-spark - Spark 嵌套 foreach

java - IntelliJ - IntelliJ 运行配置忽略 maven.config 和 jvm.config 文件

java - 在 Java 中传递自定义对象

cassandra - 无法达到一致性级别 ONE : info={ 'required_replicas' : 1, 'alive_replicas' : 0, 'consistency' : 1}

apache-spark - 如何将 Pyspark 连接到在 docker 上运行的 datastax Cassandra?

apache-spark - ValueError : Cannot convert column into bool

java - 我可以使用什么来代替 Java 中的 Vector?

java - 自定义日期反序列化: Jackson

Cassandra 压缩宽行大分区

hadoop - Apache可以在每个节点上 Spark 缓存吗?