当我在 Spark(由 java 编写)应用程序中使用 UDF 函数时,出现此错误。
org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerialized(ClosureCleaner.scala:403) ... 引起:java.io.NotSerializedException:jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic 序列化堆栈: - 对象不可序列化(类:jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic,值:jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic@f237ae7)
这是我的代码:
//"alarmMeasure" is a dataset from postgres
//"macroInfo" is also a dataset,but from csv
alarmMeasure.sparkSession().sqlContext().udf().register("genrateKeyId", new UDF2<String,String,String>() {
@Override
public String call(String almDetectionCode,String time) throws Exception {
StringBuilder keyId = new StringBuilder();
time = DateTimeUtils.transform(time,"yyyy-MM-dd hh:mm:ss","yyyyMMddhhmm");
keyId.append("KNLG");
keyId.append("_");
keyId.append(almDetectionCode);
keyId.append("_");
keyId.append(time);
return keyId.toString();
}
}, DataTypes.StringType);
Dataset tmp = alarmMeasure.join(macroInfo,alarmMeasure.col("deviceName")
.equalTo(macroInfo.col("deviceName")),"inner")
.drop(macroInfo.col("deviceName"));
tmp.withColumn("KeyId",functions.callUDF("genrateKeyId",tmp.col("alarmDectionCode"),tmp.col("alarmDectionCode").show();
一些博客告诉我应该实现 java.io.Serialized,所以我尝试了,但得到了同样的错误,如下所示:
引起:java.io.NotSerializedException:jp.co.nec.necdas.commons.spark.SparkContextManager 序列化堆栈: - 对象不可序列化(类:jp.co.nec.necdas.commons.spark.SparkContextManager,值:jp.co.nec.necdas.commons.spark.SparkContextManager@153cfd86)
“SparkContextManager”是我的类中使用的 API,这是否意味着我必须确保我的类中使用的所有类都实现 java.io.Serializable?
最佳答案
您有两个选择:
- 不可序列化的对象应定义为静态,因为静态字段不是对象的一部分,并且它们不会被序列化。
- 在首次使用时实例化不可序列化对象,因此它不是发送到 Worker 的可序列化对象的一部分。
实际上,第二个选项是我在当前项目中使用的选项,其中我们的 UDF 必须使用我们无法修改的外部 jar。
关于java - org.apache.spark.SparkException : Task not serializable, 除实现 java.io.Serializable 之外的任何其他解决方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58428532/