scala - 使用Apache Spark流向ElasticSearch实时发送Kafka消息

标签 scala apache-spark elasticsearch apache-spark-sql spark-streaming

大家早上好,

我是scala和spark流的初学者,我的用例包括从Kafka加载流到spark流然后到elasticsearch,这是我的代码:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object KAFKAStreaming {

def main(args: Array[String]): Unit = {
val brokers ="localhost:9092"
val groupid ="GRP1"
val topics ="producer"

val conf = new SparkConf().setMaster("local[*]").setAppName("test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes","http://localhost:9200")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

sc.setLogLevel("OFF")

val topicSet= topics.split(",").toSet
val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupid,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
)

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topicSet, kafkaParams)
)
// the stream :
//na1;03/04/2020 10:35;23
//na2;04/04/2020 10:35;15
//na1;05/04/2020 10:35;20
//na2;06/04/2020 10:35;12
//na1;07/04/2020 10:35;40

val line = messages.map(_.value)
val name =line.map(a=>("name" -> a.split(";")(0)))
val timestamp =line.map(a=>("timestamp" -> a.split(";")(1)))
val value =line.map(a=>("value" -> a.split(";")(2).toInt))

 sc.makeRDD(Seq(name,timestamp,value)).saveToEs("spark/docs")

ssc.start()
ssc.awaitTermination()

}
}

我收到以下错误消息:线程“主”中的异常org.apache.spark.SparkException:由于阶段故障而导致作业中止:无法序列化任务2,而没有尝试重试。序列化期间的异常:java.io.NotSerializableException:序列化DStream时,Graph意外为null。
序列化堆栈:


我知道这是一个序列化问题,但我不知道如何解决。

完整的错误跟踪:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 2, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. Serialization stack:


at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:274)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:108)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:79)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:74)
at org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:55)
at KAFKAStreaming$.main(KAFKAStreaming.scala:50)
at KAFKAStreaming.main(KAFKAStreaming.scala)

最佳答案

您的代码在如何创建rdd以及如何将其持久化到 flex 搜索方面存在一些误解

sc.makeRDD(Seq(name,timestamp,value)).saveToEs("spark/docs")

从您的code name, timestamp and values中已经是rdd,因此将它们与rdd进行序列化是没有意义的,因此是错误的。

你想做的是
name.saveToEs("spark/names")
timestamps.saveToEs("spark/timestamps")
value.saveToEs("spark/values")

如果您希望所有集合中的一个都围绕流中收到的消息进行案例分类,请增强信息量,而不是仅将一个rdd保存到ES

关于scala - 使用Apache Spark流向ElasticSearch实时发送Kafka消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61344466/

相关文章:

scala - 如何在 Scala mutable.Seq 上追加或前置

java - 我们如何从数据帧在 scala 中创建嵌套数组?

python - PySpark 中的列过滤

scala - '#' 在类型参数中是什么意思?

Scala 反射更新案例类 val

apache-spark - ProcessingTimeExecutor :66 - Current batch is falling behind. 触发间隔为xxx毫秒,但花费了xxx毫秒

elasticsearch - 使用Java API创建索引时如何使用模板

ElasticSearch .NET 子聚合

java - elasticsearch:找不到索引数据(单元节点已关闭)

scala - 如何从 Enumerator/Iteratee 获取结果?