apache-spark - 创建ElasticSearch索引但消息不出现

标签 apache-spark elasticsearch apache-kafka spark-streaming

我正在尝试从Kafka获取Stream消息并使用spark发送到ElasticSearch。 Spark以各种系统的df大小的形式从kafka接收消息,并为不同的内存使用情况生成消息,并将其推送到ElasticSearch。我得到的问题是正在创建索引,但是消息没有出现在Elastic中。我是新来的。

package rnd
import com.sun.rowset.internal.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._
object WordFind {
  def main(args: Array[String]) {
  }
  import org.apache.spark.SparkConf
  val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
  val sc = new SparkContext(conf)
  //val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
  import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.streaming.Seconds
  val batchIntervalSeconds = 2
  //val ssc = new StreamingContext(conf, Seconds(10))
  import org.apache.spark.streaming.kafka.KafkaUtils
  import org.apache.spark.streaming.dstream.ReceiverInputDStream
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
  val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
    "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
  import org.apache.spark.streaming.dstream.DStream
  val filteredStream: DStream[Array[String]] = kafkaStream
    .filter(!_._2.contains("Filesystem")) // eliminate header
    .map(_._2.split("\\s+")) // split with space
  val outputDStream: DStream[String] = filteredStream.map {
    row =>
      val useIdx = row.length - 2
      val useSystemInfo = row.length - 6
      // if Use%>70 for any case> Message: Increase ROM size by 20%
      // if Use%<30% for any case> Message: Decrease ROM size by 25%
      val sysName = row(useSystemInfo).toString
      val usePercent = row(useIdx).replace("%", "").toInt
      usePercent match {
        case x if x > 70 => sysName + " Increase ROM size by 20%"
        case x if x < 30 => sysName + "Decrease ROM size by 25%"
        case _ => "Undefined"
          usePercent.toString

      }
  }
  import org.elasticsearch.spark.sql._
  // outputDStream.print()

//outputDStream.print()
  val config: Map[String,String] = Map("es.index.auto.create" -> "yes")
  outputDStream.foreachRDD{messageRDD =>


    //messageRDD.saveToEs("dfvaluemessage_v1/km")
    messageRDD.saveToEs("dfvaluemessage_v1/km", config)
  }
  //outputDStream.foreachRDD{messageRDD =>
    //messageRDD.saveToEs("dfvaluemessage_v1/km")
  //}
  //outputDStream.saveToEs("kafkawordcount_v1/kwc")
  // To make sure data is not deleted by the time we query it interactively
  ssc.remember(Minutes(1))
  //ssc.checkpoint(checkpointDir)
  ssc
  //    }
  // This starts the streaming context in the background.
  ssc.start()
  // This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
  ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}

ElasticSearch输出如下:

enter image description here

最佳答案

问题是我试图使用foreachRDD并使用savetoES将消息推送到Elasticsearch。但这只能通过创建数据框来完成。我进行了更改,效果很好。

package rnd
import com.sun.rowset.internal.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._
object WordFind {
  def main(args: Array[String]) {
  }
  import org.apache.spark.SparkConf
  val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
  val sc = new SparkContext(conf)
  //val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
  import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.streaming.Seconds
  val batchIntervalSeconds = 2
  //val ssc = new StreamingContext(conf, Seconds(10))
  import org.apache.spark.streaming.kafka.KafkaUtils
  import org.apache.spark.streaming.dstream.ReceiverInputDStream
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
  val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
    "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
  import org.apache.spark.streaming.dstream.DStream
  val filteredStream: DStream[Array[String]] = kafkaStream
    .filter(!_._2.contains("Filesystem")) // eliminate header
    .map(_._2.split("\\s+")) // split with space
  val outputDStream: DStream[String] = filteredStream.map {
    row =>
      val useIdx = row.length - 2
      val useSystemInfo = row.length - 6
      // if Use%>70 for any case> Message: Increase ROM size by 20%
      // if Use%<30% for any case> Message: Decrease ROM size by 25%
      val sysName = row(useSystemInfo).toString
      val usePercent = row(useIdx).replace("%", "").toInt
      usePercent match {
        case x if x > 70 => sysName + " Increase ROM size by 20%"
        case x if x < 30 => sysName + "Decrease ROM size by 25%"
        case _ => "Undefined"
          usePercent.toString

      }
  }
  import org.elasticsearch.spark.sql._
  // outputDStream.print()

//outputDStream.print()
  val config: Map[String,String] = Map("es.index.auto.create" -> "yes")
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  outputDStream.foreachRDD{messageRDD =>

val df = messageRDD.toDF("messages")
    //messageRDD.saveToEs("dfvaluemessage_v1/km")
    df.saveToEs("dfvaluemessage_v1/km", config)
  }
  //outputDStream.foreachRDD{messageRDD =>
    //messageRDD.saveToEs("dfvaluemessage_v1/km")
  //}
  //outputDStream.saveToEs("kafkawordcount_v1/kwc")
  // To make sure data is not deleted by the time we query it interactively
  ssc.remember(Minutes(1))
  //ssc.checkpoint(checkpointDir)
  ssc
  //    }
  // This starts the streaming context in the background.
  ssc.start()
  // This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
  ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}

关于apache-spark - 创建ElasticSearch索引但消息不出现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48807530/

相关文章:

log4j - 如何抑制在 EMR 上运行的 spark-sql 的 INFO 消息?

hadoop - 有效地在Spark中重用Hadoop代码?

python - pyspark:合并(外连接)两个数据框

apache-spark - 写入 JSON 文件时在阶段失败时引发 FileAlreadyExistsException

java - Apache kafka - 手动确认(AbstractMessageListenerContainer.AckMode.MANUAL)不起作用并且在库升级时重播事件

arrays - 分数取决于数组中的位置

elasticsearch - Elasticsearch - token 化和多重匹配查询

elasticsearch - Elasticsearch :一旦获得所需结果,如何终止多重搜索查询

apache-kafka - 如何在不消费的情况下读取来自kafka消费组的消息?

apache-kafka - 错误 : Invalid value "sasl_ssl" for configuration property "security.protocol"