在我们的项目中,我们使用Apache Spark写入ES。我们运行多个并行写入ES的spark作业。我们处理的数据量太大,导致写入吞吐量高达〜5K个写入/秒。
我们要限制ES写入,以使其降低到500至1000个写入/秒的范围。我们遇到过像es.batch.size.bytes
和es.batch.size.entries
这样的ES配置,但是我们不确定这些配置如何与Apache Spark配合使用。
最佳答案
使用repartition()
或更优选地使用coalesce()
在Spark中对数据进行重新分区,如果您要减少分区数,则是限制索引到ES的简单方法。
如果要在pyspark中设置属性
esconf={}
esconf["es.mapping.id"] = "_id"
esconf["es.nodes"] = "localhost"
esconf["es.port"] = "9200"
esconf["es.batch.size.bytes"] = "1000000" //default 1mb for bulk request
esconf["es.batch.size.entries"] = "1000" //default 1000 for bulk request
df.write.format("org.elasticsearch.spark.sql").options(**esconf).mode("append").save("index_name")
注意:请注意,批量大小和条目是按任务实例分配的。始终乘以Hadoop作业中的任务数,以在运行时达到Elasticsearch的总批量大小/条目。这就是您获得5K写/秒的原因
关于apache-spark - throttle 从Apache Spark写入ES,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62989672/