apache-spark - Spark Streaming Kafka 到 ES

标签 apache-spark elasticsearch spark-structured-streaming

我有一个 Spark 流作业,它将从 kafka 读取并通过 Http 请求写入弹性。
我想验证来自 Kafka 的每个请求并根据业务需要更改有效负载并写入 Elastic Search。
我已经使用 ES Http Request 将数据推送到 Elastic Search 中。有人可以指导我如何通过数据框将数据写入 ES 吗?
代码片段:

val dfInput = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("group.id", sourceTopicGroupId)
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
  .load()

import spark.implicits._

val resultDf = dfInput
  .withColumn("value", $"value".cast("string"))
  .select("value")

resultDf.writeStream.foreach(new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
    processEventsData(value.get(0).asInstanceOf[String], deviceIndex, msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
  }

  override def close(errorOrNull: Throwable): Unit = {
  }
}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination() //"1 second"
}
这样我们就无法实现性能。
有什么办法吗?
  • Spark 版本 2.3.2
  • 卡夫卡分区 20
  • ES 版本 7.7.0
  • 最佳答案

    您可以使用 elasticsearch-spark-20_2.11 ,很简单,想了解更多es-hadoop

    EsSpark.saveJsonToEs(rdd, index, conf)
    

    关于apache-spark - Spark Streaming Kafka 到 ES,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64210690/

    相关文章:

    scala - Spark Structured Streaming 从查询异常中恢复

    scala - 如何使用结构化流从 Kafka 读取 JSON 格式的记录?

    apache-spark - Spark UDF错误-不支持类型为Any的架构

    apache-spark - 使用 Spark 读取 Azure Synapse 表

    python-3.x - 装满 Spark 数据帧-pyspark

    ruby-on-rails - 本地开发环境限制Elasticsearch内存使用的最佳实践

    apache-spark - 启动 Spark History Server 时如何指定 Spark 属性?

    python - Elasticsearch HTTPConnectionPool(主机='127.0.0.1',端口=9200): Max retries exceeded

    elasticsearch - 除了使用Elasticsearch恢复API之外,还有什么方法可以恢复Elasticsearch快照?

    apache-spark - 如何创建自定义流数据源?