我有一个 Spark 流作业,它将从 kafka 读取并通过 Http 请求写入弹性。
我想验证来自 Kafka 的每个请求并根据业务需要更改有效负载并写入 Elastic Search。
我已经使用 ES Http Request 将数据推送到 Elastic Search 中。有人可以指导我如何通过数据框将数据写入 ES 吗?
代码片段:
val dfInput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("group.id", sourceTopicGroupId)
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
import spark.implicits._
val resultDf = dfInput
.withColumn("value", $"value".cast("string"))
.select("value")
resultDf.writeStream.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex, msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
}
override def close(errorOrNull: Throwable): Unit = {
}
}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination() //"1 second"
}
这样我们就无法实现性能。有什么办法吗?
最佳答案
您可以使用 elasticsearch-spark-20_2.11
,很简单,想了解更多es-hadoop
EsSpark.saveJsonToEs(rdd, index, conf)
关于apache-spark - Spark Streaming Kafka 到 ES,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64210690/