mongodb - Spark - 如何在map()中创建新的RDD? (对于 Executor,SparkContext 为 null)

标签 mongodb apache-spark mongodb-query

我有以下应用程序,它通过 MongoDB Spark 连接器利用与 MongoDB 的连接。我的代码崩溃是因为 SparkContext 对于执行器来说为 null。基本上,我从 MongoDB 读取数据,处理这些数据,这会导致需要发送到 MongoDB 的其他查询。最后一步是保存这些附加查询的数据。我使用的代码:

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(...);
    JavaPairRDD<String, Document> pairRdd = aggregatedRdd
            .mapToPair((document) -> new Tuple2(document.get("_id"), document));
    JavaPairRDD<String, List<Document>> mergedRdd = pairRdd.aggregateByKey(new LinkedList<Document>(),
            combineFunction, mergeFunction);

    JavaRDD<Tuple2<String, List<Tuple2<Date, Date>>>> dateRdd = mergedRdd.map(...);

    //at this point dateRdd contains key/value pairs of:
    //Key: a MongoDB document ID (String)
    //Value: List of Tuple<Date, Date> which are date ranges (start time and end time). 

    //For each of that date ranges I want to retrieve the data out of MongoDB
    //and, for now, I just want to save that data

    dateRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, List<Tuple2<Date, Date>>>>>() {
        @Override
        public void call(Iterator<Tuple2<String, List<Tuple2<Date, Date>>>> partitionIterator) throws Exception {
            for (; partitionIterator.hasNext(); ) {
                Tuple2<String, List<Tuple2<Date, Date>>> tuple = partitionIterator.next();
                String fileName = tuple._1;
                List<Tuple2<Date, Date>> dateRanges = tuple._2;

                for (Tuple2<Date, Date> dateRange : dateRanges) {
                    Date startDate = dateRange._1;
                    Date endDate = dateRange._2;

                    Document aggregationDoc = Document.parse("{ $match: { ts: {$lt: new Date(" + startDate.getTime()
                            + "), $gt: new Date(" + endDate.getTime() + ")}, root_document: \"" + fileName
                            + "\", signals: { $elemMatch: { signal: \"SomeValue\" } } } }");


                    //this call will use the initial MongoSpark rdd with the aggregation pipeline that just got created.
                    //this will get sent to MongoDB 
                    JavaMongoRDD<Document> filteredSignalRdd = rdd.withPipeline(Arrays.asList(aggregationDoc));

                    String outputFileName = String.format("output_data_%s_%d-%d", fileName,
                            startDate.getTime(), endDate.getTime());
                    filteredSignalRdd.saveAsTextFile(outputFileName);
                }
            }
        }
    }); 

我得到的异常(exception)是:

Job aborted due to stage failure: Task 23 in stage 2.0 failed 4 times, most recent failure: Lost task 23.3 in stage 2.0 (TID 501, hadoopb24): java.lang.IllegalArgumentException: requirement failed: RDD transformation requires a non-null SparkContext.
Unfortunately SparkContext in this MongoRDD is null.
This can happen after MongoRDD has been deserialized.
SparkContext is not Serializable, therefore it deserializes to null.
RDD transformations are not allowed inside lambdas used in other RDD transformations.
    at scala.Predef$.require(Predef.scala:233)
    at com.mongodb.spark.rdd.MongoRDD.checkSparkContext(MongoRDD.scala:170)
    at com.mongodb.spark.rdd.MongoRDD.copy(MongoRDD.scala:126)
    at com.mongodb.spark.rdd.MongoRDD.withPipeline(MongoRDD.scala:116)
    at com.mongodb.spark.rdd.api.java.JavaMongoRDD.withPipeline(JavaMongoRDD.scala:46)

下图说明了我对应用程序的期望: enter image description here

这里有什么问题,如何实现新 RDD 的“嵌套”异步创建?

如何访问执行器中的 MongoSpark“上下文”? MongoSpark 库需要访问 SparkContext,这在执行器中不可用。

我是否需要再次将所有数据传送给驱动程序,然后让驱动程序向 MongoSpark“上下文”发送新的调用?我可以看到这是如何工作的,但这需要异步完成,即每当分区完成数据处理并具有 <String, Tuple<Date,Date>> 时准备好后,将其推送给驾驶员并让他开始新的查询。如何才能做到这一点?

最佳答案

这是预期的并且不会改变。 Spark 不支持:

  • 嵌套 RDD。
  • 嵌套转换。
  • 嵌套操作。
  • 通过操作/转换访问上下文或 session 。

在这种情况下,您可能可以使用标准 Mongo 客户端。

关于mongodb - Spark - 如何在map()中创建新的RDD? (对于 Executor,SparkContext 为 null),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40550582/

相关文章:

java - 使用 MongoDB 进行 Hibernate 配置

spring - spring batch mongo itemreader 中的问题?

hadoop - 在yarn上运行spark时我们应该使用哪种模式?

node.js - 使用findOneAndUpdate如何检查文档是否被插入或更新?

node.js - Mongodb根据属性将结果拆分为数组

c# - MongoDb C#驱动程序检索增量更新的值

java - 为依赖项引发 ClassNotFoundException

scala - 如何在 Spark 中一次对多列进行聚合

mongodb - 如何从 atlas 备份文件中恢复 mongo?

javascript - 在 Meteor 应用程序上,根据 Mongo 中的字段值向从 Mongo 接收的数据添加类