apache-spark - Spark 结构化流与 ElasticSearch 集成

标签 apache-spark elasticsearch spark-structured-streaming

我正在构建一个应用程序,它将一些数据发送到我的集群。 我将此数据存储到特定的 HDFS 文件夹中,其中正在运行 Spark Streaming 应用程序。

在这个streamApp中,我将做一些快速且廉价的数据科学。 之后,我必须将结果索引到 ElasticSearch 以便为我的 AngularApp 提供数据。

一切正常,

但是...我无法使用 ES 索引我的结果。 事实是......我无法将结果 DataFrame 转换为 RDD,因为它使用一些 Dataframe Stream 作为输入

这是我的伪代码:

val schema = StructType(
    StructField("id", StringType, nullable = false) ::
    StructField("code", StringType, nullable = false) :: Nil)


val lines = spark.readStream
  .format("json")
  .schema(schema)
  .load(HDFSPath)

// Do some basics stuff here

import spark.implicits._

val linesRDD = lines.rdd.map(row => 
        StreamingObj(row(0).toString,row(1).toString))  // RDD[StreamingObj]
linesRDD.saveToEs("stream/stream")           // ES
val linesDF= linesRDD.toDF()

val queryNode = linesDF
    .writeStream
    .format("console")
    .outputMode(OutputMode.Append)
    .trigger(Trigger.ProcessingTime(4.seconds))
    .start

当我尝试将 DataFrame 转换为 RDD 时,它失败了。

我必须转换为 RDD 才能索引数据。

在lines.rdd.map上,我得到了这个。

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

是否可以在ES内部索引DataStreaming Spark?

感谢您的帮助。

尝试更简单的情况:

   val lines = spark.readStream
      .format("json")
      .schema(schema)
      .load(HDFSPATH).as[StreamingObj]

  lines.writeStream
      .format("org.elasticsearch.spark.sql")
      .outputMode("append")
      .start("index/stream")

17/12/21 15:37:55 INFO util.Version: Elasticsearch Hadoop v5.4.2 [a478aabe9e] Exception in thread "main" java.lang.UnsupportedOperationException: Data source org.elasticsearch.spark.sql does not support streamed writing

我做与文档相同的事情=> https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-sql-streaming

甚至这个例子:https://discuss.elastic.co/t/spark-structured-streaming-sink-in-append-mode/105664/4

或者这个:

https://discuss.elastic.co/t/structured-streaming-failed-to-find-data-source-es/112144

这是我的 Maven 依赖项:

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>5.4.2</version>
</dependency>

这是好的吗?我不能使用 format('es') ,它最密集地找到它。

似乎 ES 中的 Spark 结构化流只有 > 6.0

参见https://www.elastic.co/blog/structured-streaming-elasticsearch-for-hadoop-6-0

最佳答案

问题就在这里

val linesRDD = lines.rdd.map(row => 
        StreamingObj(row(0).toString,row(1).toString))  // RDD[StreamingObj]

结构化流查询中不允许转换为RDD。您可以尝试直接编写:

lines
  .writeStream
  .format("org.elasticsearch.spark.sql")
  ...

或使用ForeachWriter:

lines.writeStream.foreach(...) 

关于apache-spark - Spark 结构化流与 ElasticSearch 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47926169/

相关文章:

java - Spark 和 Cassandra : requirement failed: Columns not found in class com. datastax.spark.connector.japi.CassandraRow : [mycolumn. ..]

hadoop - 在虚拟盒集群上部署CDH5?

apache-spark - 如何在 Spark 3.0 预览版中使用 Delta?

scala - 具有定期更新的静态数据集的结构化流

scala - 即使设置了 auto.offset.reset=latest,Spark 结构化流查询也始终以 auto.offset.rest=earliest 开头

apache-spark - 如何删除超过 X 天/年的 Databricks 数据?

scala - 从Apache Spark填充Elasticsearch日期

elasticsearch - 存储桶中的弹性脚本和更高级别的聚合

python - 如何获取所有包含elasticsearch-dsl查询关键字的结果?

java - SparkSession 初始化抛出 ExceptionInInitializerError