cassandra - Spark-streaming:如何将流数据输出到cassandra

标签 cassandra apache-spark spark-streaming

我正在使用 Spark-streaming 读取 kafka 流消息。 现在我想将 Cassandra 设置为我的输出。 我在 cassandra“test_table”中创建了一个表,其中包含“key:text 主键”和“value:text”列 我已成功将数据映射到 JavaDStream<Tuple2<String,String>> data像这样:

JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));

JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >() 
{
    public Tuple2<String,String> call(Tuple2<String, String> message)
    {
        return new Tuple2<String,String>( message._1(), message._2() );
    }
}
);  

然后我创建了一个列表:

List<TestTable> list = new ArrayList<TestTable>();

其中 TestTable 是我的自定义类,其结构与我的 Cassandra 表相同,成员为“key”和“value”:

class TestTable
{
    String key;
    String val;

    public TestTable() {}

    public TestTable(String k, String v)
    {
        key=k;
        val=v;
    }

    public String getKey(){
        return key;
    }

    public void setKey(String k){
        key=k;
    }

    public String getVal(){
        return val;
    }

    public void setVal(String v){
        val=v;
    }

    public String toString(){
        return "Key:"+key+",Val:"+val;
    }
}

请建议一种如何添加 JavaDStream<Tuple2<String,String>> data 中的数据的方法进入List<TestTable> list 。 我这样做是为了随后可以使用

JavaRDD<TestTable> rdd = sc.parallelize(list); 
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table"); 

将 RDD 数据保存到 Cassandra 中。

我尝试过这样编码:

messages.foreachRDD(new Function<Tuple2<String,String>, String>()
                        {
                            public List<TestTable> call(Tuple2<String,String> message)
                            {
                                String k = message._1();
                                String v = message._2();
                                TestTable tbl = new TestTable(k,v);
                                list.put(tbl);
                            }
                        }
                    );

但似乎发生了某种类型不匹配。 请帮忙。

最佳答案

假设该程序的目的是将kafka中的流数据保存到Cassandra中,则无需转储JavaDStream<Tuple2<String,String>>数据写入List<TestTable>列表。

DataStax 的 Spark-Cassandra 连接器直接通过 Spark Streaming extensions 支持此功能。 。

JavaDStream 上使用此类扩展应该足够了:

javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();

而不是在中间列表上耗尽数据。

关于cassandra - Spark-streaming:如何将流数据输出到cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27318202/

相关文章:

Cassandra 不同计数

cassandra - 如何在 Cassandra 中将 Unix 10 位纪元时间存储和查询为人类可读的内容?

Java Spark DataFrame 连接包含数组的列

apache-spark - 阅读最新的 spark kafka 流媒体

scala - 按分隔符分割 Spark 流

scala - Spark消费者不读取Kafka生产者消息Scala

elasticsearch - cassandra vs Elasticsearch vs任何其他设计建议

cassandra - Docker 与 DataStax 连接无法正常工作

scala - 无法从 HDFS 读取文件

apache-spark - Spark 流式传输错误 : Accumulator must be registered before send to executor