我有以下应用程序,它通过 MongoDB Spark 连接器利用与 MongoDB 的连接。我的代码崩溃是因为 SparkContext 对于执行器来说为 null。基本上,我从 MongoDB 读取数据,处理这些数据,这会导致需要发送到 MongoDB 的其他查询。最后一步是保存这些附加查询的数据。我使用的代码:
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(...);
JavaPairRDD<String, Document> pairRdd = aggregatedRdd
.mapToPair((document) -> new Tuple2(document.get("_id"), document));
JavaPairRDD<String, List<Document>> mergedRdd = pairRdd.aggregateByKey(new LinkedList<Document>(),
combineFunction, mergeFunction);
JavaRDD<Tuple2<String, List<Tuple2<Date, Date>>>> dateRdd = mergedRdd.map(...);
//at this point dateRdd contains key/value pairs of:
//Key: a MongoDB document ID (String)
//Value: List of Tuple<Date, Date> which are date ranges (start time and end time).
//For each of that date ranges I want to retrieve the data out of MongoDB
//and, for now, I just want to save that data
dateRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, List<Tuple2<Date, Date>>>>>() {
@Override
public void call(Iterator<Tuple2<String, List<Tuple2<Date, Date>>>> partitionIterator) throws Exception {
for (; partitionIterator.hasNext(); ) {
Tuple2<String, List<Tuple2<Date, Date>>> tuple = partitionIterator.next();
String fileName = tuple._1;
List<Tuple2<Date, Date>> dateRanges = tuple._2;
for (Tuple2<Date, Date> dateRange : dateRanges) {
Date startDate = dateRange._1;
Date endDate = dateRange._2;
Document aggregationDoc = Document.parse("{ $match: { ts: {$lt: new Date(" + startDate.getTime()
+ "), $gt: new Date(" + endDate.getTime() + ")}, root_document: \"" + fileName
+ "\", signals: { $elemMatch: { signal: \"SomeValue\" } } } }");
//this call will use the initial MongoSpark rdd with the aggregation pipeline that just got created.
//this will get sent to MongoDB
JavaMongoRDD<Document> filteredSignalRdd = rdd.withPipeline(Arrays.asList(aggregationDoc));
String outputFileName = String.format("output_data_%s_%d-%d", fileName,
startDate.getTime(), endDate.getTime());
filteredSignalRdd.saveAsTextFile(outputFileName);
}
}
}
});
我得到的异常(exception)是:
Job aborted due to stage failure: Task 23 in stage 2.0 failed 4 times, most recent failure: Lost task 23.3 in stage 2.0 (TID 501, hadoopb24): java.lang.IllegalArgumentException: requirement failed: RDD transformation requires a non-null SparkContext.
Unfortunately SparkContext in this MongoRDD is null.
This can happen after MongoRDD has been deserialized.
SparkContext is not Serializable, therefore it deserializes to null.
RDD transformations are not allowed inside lambdas used in other RDD transformations.
at scala.Predef$.require(Predef.scala:233)
at com.mongodb.spark.rdd.MongoRDD.checkSparkContext(MongoRDD.scala:170)
at com.mongodb.spark.rdd.MongoRDD.copy(MongoRDD.scala:126)
at com.mongodb.spark.rdd.MongoRDD.withPipeline(MongoRDD.scala:116)
at com.mongodb.spark.rdd.api.java.JavaMongoRDD.withPipeline(JavaMongoRDD.scala:46)
这里有什么问题,如何实现新 RDD 的“嵌套”异步创建?
如何访问执行器中的 MongoSpark“上下文”? MongoSpark 库需要访问 SparkContext,这在执行器中不可用。
我是否需要再次将所有数据传送给驱动程序,然后让驱动程序向 MongoSpark“上下文”发送新的调用?我可以看到这是如何工作的,但这需要异步完成,即每当分区完成数据处理并具有 <String, Tuple<Date,Date>>
时准备好后,将其推送给驾驶员并让他开始新的查询。如何才能做到这一点?
最佳答案
这是预期的并且不会改变。 Spark 不支持:
- 嵌套 RDD。
- 嵌套转换。
- 嵌套操作。
- 通过操作/转换访问上下文或 session 。
在这种情况下,您可能可以使用标准 Mongo 客户端。
关于mongodb - Spark - 如何在map()中创建新的RDD? (对于 Executor,SparkContext 为 null),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40550582/