Mongodb-hadoop-connector 会创建很多分区,即使我向它添加查询

标签 mongodb hadoop apache-spark

我有一个非常大的 mongo 表,我想使用 spark 对其进行一些分析,它太大了,我不想加载整个数据库。但看起来它总是扫描整个数据库并将它们分成大量的分区,即使我将 mongo.input.query 传递给它也是如此。我正在使用 mongo-hadoop加载它,我的代码如下所示:

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)

val mongoConfig = new Configuration()

val beginDate = new Date(2016 - 1900,6,7)

println("the begin data is: =========== >" + beginDate)

val beginId = new ObjectId(beginDate, 0,0.toShort,0)

mongoConfig.set("mongo.input.uri",
    "mongodb://mymongoduri/mongodb.mongocollection")

val queryStr = """{"_id": {"$gt" : {"$oid":"beginDate" }}}""".replace("beginDate", beginId.toString)
mongoConfig.set("mongo.input.query", queryStr)
mongoConfig.set("mongo.input.fields", """{ "its.src":-1, "its._id":-1, "its.cid": -1}""")

val documents = sc.newAPIHadoopRDD(
  mongoConfig,                // Configuration
  classOf[MongoInputFormat],  // InputFormat
  classOf[Object],            // Key type
  classOf[BSONObject])        // Value type

val OUTPUT_PATH = if(ENV == Some("dev")){
  s"./result"
} else{
  s"s3://${OUTPUT_BUCKET}/output/graph/${beginDate}"
}

documents.saveAsNewAPIHadoopFile(
  OUTPUT_PATH,
  classOf[Object],
  classOf[BSONObject],
  classOf[BSONFileOutputFormat[Object, BSONObject]]
)

它最终在 s3 中产生了大量的空文件,这不是我预期的结果(而且它浪费了很多钱)。

我已经阅读了文档,它说 mongo.input.query使用查询过滤输入集合,我可以只加载查询的数据吗?不只是过滤它们。

或者,我可以只存储那些非空的分区吗?

最佳答案

mongo 的 spark hadoop 连接器总是读取整个集合并相应地进行分区,然后使用输入查询过滤对象。当您保存文档 RDD 时,它总是会保存分区,无论它是否为空。

您可以将 RDD 重新分区为 1。或者使用 documents.coalesce(1).saveAsNewAPIHadoopFile(....)

关于Mongodb-hadoop-connector 会创建很多分区,即使我向它添加查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38259056/

相关文章:

json - Pyspark:读取对象之间没有分隔符的 JSON 数据文件

使用 w=0 的 Mongodb 更新保证

javascript - 如何使用多嵌套数组文档查找单个字段

hadoop - 画面 : Error while using Impala to connect to Cloudera Hadoop

python - 无法将pyspark数据帧保存到Windows 10上的 Parquet

scala - 如何迭代 Spark DataFrame 行?

mongodb - mongoDB安装错误

django - 在 AWS 上使用 MongoDB 部署 Django 应用程序

scala - 在 Apache Spark 中按列分区到 S3

Hadoop 名称节点和日志节点自动占用磁盘空间