elasticsearch - Cassandra,Spark,Elasticsearch:在kibana中流化数据以进行可视化

标签 elasticsearch cassandra streaming apache-spark

我正在尝试在Kibana中可视化来自spark的数据。但是,使用以下命令创建RRD:

    val test = sc.cassandraTable("test","data")

然后,我使用Elasticsearch和Hadoop库通过以下方式流式传输到Elasticsearch:
    EsSpark.saveToEs(test, "spark/docs", Map("es.nodes" -> "192.168.1.88"))

但我得到这个错误:
15/04/20 16:15:27 ERROR TaskSetManager: Task 0 in stage 12.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 36, 192.168.1.92): org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: Cannot handle type [class com.datastax.spark.connector.CassandraRow]

谁能指导我从Spark到Elasticsearch流式传输。有没有更好的方法来可视化来自cassandra,solr或spark的数据。我遇到了香蕉,但似乎没有发布dashabords的选项。

谢谢

最佳答案

根据Spark Cassandra Connector Guide,您可以首先定义一个案例类,然后将CassandraRow转换为案例类对象,然后将这些对象保存到Elasticsearch。以下是指南中的示例代码:

case class WordCount(w: String, c: Int)

object WordCount { 
    implicit object Mapper extends DefaultColumnMapper[WordCount](
        Map("w" -> "word", "c" -> "count")) 
}

sc.cassandraTable[WordCount]("test", "words").toArray
// Array(WordCount(bar,20), WordCount(foo,10))

sc.parallelize(Seq(WordCount("baz", 30), WordCount("foobar", 40)))
  .saveToCassandra("test", "words", SomeColumns("word", "count"))

关于elasticsearch - Cassandra,Spark,Elasticsearch:在kibana中流化数据以进行可视化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29742695/

相关文章:

考虑到 Cassandra 的 MySQL 应用程序

cassandra - 如何配置 DBeaver 和 Cassandra

java - 使用 Java 将图像拼接在网格中

ElasticsearchClientException : Request failed to execute. 调用:状态码 403 来自:POST/index/

elasticsearch - 如何在elasticsearch中配置discovery.type?

elasticsearch - 如何在Fluent Bit中处理间歇性断开连接并通过HTTP发送正确的时间戳?

Cassandra : OperationTimedOut: errors={}, 最后主机=127.0.0.1

amazon-web-services - 向AWS ELB后面的所有计算机发送请求

windows-phone-7 - Windows Phone 7 上的 MediaElement 和 AAC 流支持

iphone - iOS 巨大的 JSON (30MB) 处理