python - 如何在Spark中使用ElasticSearch在脚本文档中更新或部分更新?

标签 python scala elasticsearch spark-streaming spark-structured-streaming

我在python中有一个伪代码,可以从Kafka流中读取并在Elasticsearch中增加文档的数量(如果文档已经存在,则递增view的计数器)。

for message in consumer:

    msg = json.loads(message.value)
    print(msg)
    index = INDEX_NAME
    es_id = msg["id"]
    script = {"script":"ctx._source.view+=1","upsert" : msg}
    es.update(index=index, doc_type="test", id=es_id, body=script)

由于我想在分布式环境中使用它,因此我使用Spark结构化流
df.writeStream \
.format("org.elasticsearch.spark.sql")\
.queryName("ESquery")\
.option("es.resource","credentials/url") \
.option("checkpointLocation", "checkpoint").start()

或从KafkaStream读取的Scala中的SparkStreaming:
// Initializing Spark Streaming Context and kafka stream
sparkConf.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
[...] 
val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicsSet, kafkaParams)
    )

[...]
val urls = messages.map(record => JsonParser.parse(record.value()).values.asInstanceOf[Map[String, Any]])
urls.saveToEs("credentials/credential")
.saveToEs(...)elastic-hadoop.jar记录的here的API。不幸的是,this repo并没有得到很好的记录。因此,我不知道可以将脚本命令放在哪里。

有没有人可以帮助我?先感谢您

最佳答案

您应该能够通过将写入模式设置为“更新”(或upsert)并将脚本传递为“脚本”(取决于ES版本)来做到这一点。

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id", "es.write.operation" -> "update","es.update.script.inline" -> "your script" , ))

可能您想使用“upsert”

在同一库中有一些不错的unit tests in cascading integration。这些设置应适用于 Spark ,因为两者都使用相同的编写器。

我建议阅读单元测试以为您的ES版本选择正确的设置。

关于python - 如何在Spark中使用ElasticSearch在脚本文档中更新或部分更新?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47304316/

相关文章:

python - 如何使用 APScheduler 设置小时范围和分钟间隔

python - ValueError : Input shapes do not overlap raster. Geopandas/Rasterio,屏蔽时可能出现 CRS 错误

elasticsearch - 备份和恢复 elasticsearch - elasticdump?

elasticsearch - 删除HTTP输入插件生成的标题字段

python - 使用命名值通过 pyparsing 获取字符串中的标记位置

scala - 如何在外部 spark shuffle 服务上修复 "Error opening block StreamChunkId"

scala - 在 Scala 中,命名相当于 ORM 类的案例类的最惯用方法是什么?

Scala toString : parenthesize or not?

linux - 如何使用Shell脚本将日志文件转换为JSON格式?

python - 如何使用 Python delta-rs 从 Azure Blob 存储中读取数据