mongodb - 为什么 Mongo Spark 连接器为查询返回不同且不正确的计数?

标签 mongodb apache-spark pyspark pyspark-sql

我正在为一个项目评估 Mongo Spark 连接器,但结果不一致。我在笔记本电脑上本地使用 MongoDB 服务器版本 3.4.5、Spark(通过 PySpark)版本 2.2.0、Mongo Spark 连接器版本 2.11;2.2.0。对于我的测试数据库,我使用安然数据集 http://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/我对 Spark SQL 查询很感兴趣,当我开始运行简单的计数测试查询时,每次运行都会收到不同的计数。 这是我的 mongo shell 的输出:

> db.messages.count({'headers.To': 'eric.bass@enron.com'})
203

以下是我的 PySpark shell 的一些输出:

In [1]: df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/enron_mail.messages").load()
In [2]: df.registerTempTable("messages")
In [3]: res = spark.sql("select count(*) from messages where headers.To='eric.bass@enron.com'")
In [4]: res.show()
+--------+                                                                      
|count(1)|
+--------+
|     162|
+--------+
In [5]: res.show()
+--------+                                                                      
|count(1)|
+--------+
|     160|
+--------+
In [6]: res = spark.sql("select count(_id) from messages where headers.To='eric.bass@enron.com'")
In [7]: res.show()
+----------+                                                                    
|count(_id)|
+----------+
|       161|
+----------+
In [8]: res.show()
+----------+                                                                    
|count(_id)|
+----------+
|       162|
+----------+

我在 Google 中搜索了有关此问题的信息,但没有发现任何有用的信息。如果有人对为什么会发生这种情况以及如何正确处理有任何想法,请分享您的想法。我有一种感觉,可能是我遗漏了什么,或者某些配置不正确。

更新: 我解决了我的问题。计数不一致的原因是 MongoDefaultPartitioner 包装了使用随机抽样的 MongoSamplePartitioner。老实说,这对我来说是一个非常奇怪的默认设置。我个人更喜欢使用缓慢但一致的分区器。分区器选项的详细信息可以在官方configuration options中找到。文档。

更新: 将解决方案复制到答案中。

最佳答案

我解决了我的问题。计数不一致的原因是 MongoDefaultPartitioner 包装了使用随机抽样的 MongoSamplePartitioner。老实说,这对我来说是一个非常奇怪的默认设置。我个人更喜欢使用缓慢但一致的分区器。分区器选项的详细信息可以在官方configuration options中找到。文档。

代码:

val df = spark.read
  .format("com.mongodb.spark.sql.DefaultSource")
  .option("uri", "mongodb://127.0.0.1/enron_mail.messages")
  .option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
  .load()

关于mongodb - 为什么 Mongo Spark 连接器为查询返回不同且不正确的计数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46652026/

相关文章:

斯卡拉/ Spark : How to convert List of values into separate rows?

sockets - 异常: could not open socket on pyspark

apache-spark - PySpark:将一个数据帧中的数组值与另一个数据帧中的数组值进行比较以获得交集

ruby-on-rails - 无法使用 rails generate 生成 mongoid 配置文件

mongodb - $exists 查询比扫描整个集合更快吗?

python - Spark .stdev() Python 问题

arrays - Pyspark:过滤行内数组的内容

Python 与 Scala(用于 Spark 作业)

java - 如何在 Java 8 Stream Filter 中基于子文档过滤 Mongo 文档

MongoDB - 在另一个字段上使用 $nin