我正在构建一个应用程序,它将一些数据发送到我的集群。 我将此数据存储到特定的 HDFS 文件夹中,其中正在运行 Spark Streaming 应用程序。
在这个streamApp中,我将做一些快速且廉价的数据科学。 之后,我必须将结果索引到 ElasticSearch 以便为我的 AngularApp 提供数据。
一切正常,
但是...我无法使用 ES 索引我的结果。 事实是......我无法将结果 DataFrame 转换为 RDD,因为它使用一些 Dataframe Stream 作为输入
这是我的伪代码:
val schema = StructType(
StructField("id", StringType, nullable = false) ::
StructField("code", StringType, nullable = false) :: Nil)
val lines = spark.readStream
.format("json")
.schema(schema)
.load(HDFSPath)
// Do some basics stuff here
import spark.implicits._
val linesRDD = lines.rdd.map(row =>
StreamingObj(row(0).toString,row(1).toString)) // RDD[StreamingObj]
linesRDD.saveToEs("stream/stream") // ES
val linesDF= linesRDD.toDF()
val queryNode = linesDF
.writeStream
.format("console")
.outputMode(OutputMode.Append)
.trigger(Trigger.ProcessingTime(4.seconds))
.start
当我尝试将 DataFrame 转换为 RDD 时,它失败了。
我必须转换为 RDD 才能索引数据。
在lines.rdd.map上,我得到了这个。
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
是否可以在ES内部索引DataStreaming Spark?
感谢您的帮助。
尝试更简单的情况:
val lines = spark.readStream
.format("json")
.schema(schema)
.load(HDFSPATH).as[StreamingObj]
lines.writeStream
.format("org.elasticsearch.spark.sql")
.outputMode("append")
.start("index/stream")
17/12/21 15:37:55 INFO util.Version: Elasticsearch Hadoop v5.4.2 [a478aabe9e] Exception in thread "main" java.lang.UnsupportedOperationException: Data source org.elasticsearch.spark.sql does not support streamed writing
我做与文档相同的事情=> https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-sql-streaming
甚至这个例子:https://discuss.elastic.co/t/spark-structured-streaming-sink-in-append-mode/105664/4
或者这个:
https://discuss.elastic.co/t/structured-streaming-failed-to-find-data-source-es/112144
这是我的 Maven 依赖项:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.4.2</version>
</dependency>
这是好的吗?我不能使用 format('es') ,它最密集地找到它。
似乎 ES 中的 Spark 结构化流只有 > 6.0
参见https://www.elastic.co/blog/structured-streaming-elasticsearch-for-hadoop-6-0
最佳答案
问题就在这里
val linesRDD = lines.rdd.map(row =>
StreamingObj(row(0).toString,row(1).toString)) // RDD[StreamingObj]
结构化流查询中不允许转换为RDD
。您可以尝试直接编写:
lines
.writeStream
.format("org.elasticsearch.spark.sql")
...
或使用ForeachWriter
:
lines.writeStream.foreach(...)
关于apache-spark - Spark 结构化流与 ElasticSearch 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47926169/