我一直在尝试使用 Apache Spark 来解决一些查询,例如 top-k、skyline 等。
我制作了一个包含 SparkConf
和 JavaSparkContext
的包装器,名为 SparkContext
。此类也实现了可序列化,但由于 SparkConf
和 JavaSparkContext
不可序列化,因此该类也不可序列化。
我有一个名为 TopK
的解决 topK 查询的类,该类实现了可序列化,但该类还有一个不可序列化的 SparkContext
成员变量(由于上述原因).因此,每当我尝试从 RDD 中的 .reduce()
函数中执行 TopK
方法时,我都会遇到异常。
我找到的解决方案是使 SparkContext
成为 transient 。
我的问题是:我应该将 SparkContext
变量保留为 transient 变量还是我犯了一个大错误?
SparkContext
类:
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
public class SparkContext implements Serializable {
private final SparkConf sparConf; // this is not serializable
private final JavaSparkContext sparkContext; // this is not either
protected SparkContext(String appName, String master) {
this.sparConf = new SparkConf();
this.sparConf.setAppName(appName);
this.sparConf.setMaster(master);
this.sparkContext = new JavaSparkContext(sparConf);
}
protected JavaRDD<String> textFile(String path) {
return sparkContext.textFile(path);
}
}
TopK
类:
public class TopK implements QueryCalculator, Serializable {
private final transient SparkContext sparkContext;
.
.
.
}
抛出 Task not serializable
异常的示例。 getBiggestPointByXDimension
甚至不会被输入,因为为了让它在 reduce 中执行function 包含它的类 (TopK
) 必须是可序列化的。
private Point findMedianPoint(JavaRDD<Point> points) {
Point biggestPointByXDimension = points.reduce((a, b) -> getBiggestPointByXDimension(a, b));
.
.
.
}
private Point getBiggestPointByXDimension(Point first, Point second) {
return first.getX() > second.getX() ? first : second;
}
最佳答案
对于您的问题:我应该将 SparkContext 变量保持为 transient 吗?
是的。没关系。它仅封装 (Java)SparkContext 并且上下文在工作人员中不可用,因此将其标记为 transient
只是告诉 Serializer 不要序列化该字段。
您也可以拥有自己的不可序列化的 SparkContext
包装器并将其标记为 transient - 与上述效果相同。 (顺便说一句,鉴于 SparkContext 是 spark 上下文的 Scala 类名称,我选择了另一个名称以避免混淆。)
还有一件事:正如您所指出的,Spark 尝试序列化完整的封闭类的原因是因为在闭包中使用了该类的方法。避免那个!使用匿名类或自包含闭包(最后将转化为匿名类)。
关于java - 我应该将变量保留为 transient 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27077247/