java - org.apache.spark.SparkException : Task not serializable, 除实现 java.io.Serializable 之外的任何其他解决方案

标签 java apache-spark

当我在 Spark(由 java 编写)应用程序中使用 UDF 函数时,出现此错误。

org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerialized(ClosureCleaner.scala:403) ... 引起:java.io.NotSerializedException:jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic 序列化堆栈: - 对象不可序列化(类:jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic,值:jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic@f237ae7)

这是我的代码:

//"alarmMeasure" is a dataset from postgres 
//"macroInfo" is also a dataset,but from csv
alarmMeasure.sparkSession().sqlContext().udf().register("genrateKeyId", new UDF2<String,String,String>() {

                @Override
                public String call(String almDetectionCode,String time) throws Exception {
                    StringBuilder keyId = new StringBuilder();
                    time = DateTimeUtils.transform(time,"yyyy-MM-dd hh:mm:ss","yyyyMMddhhmm");
                    keyId.append("KNLG");
                    keyId.append("_");
                    keyId.append(almDetectionCode);
                    keyId.append("_");
                    keyId.append(time);
                    return keyId.toString();
                }
            }, DataTypes.StringType);
Dataset tmp = alarmMeasure.join(macroInfo,alarmMeasure.col("deviceName")
                    .equalTo(macroInfo.col("deviceName")),"inner")
                    .drop(macroInfo.col("deviceName"));
tmp.withColumn("KeyId",functions.callUDF("genrateKeyId",tmp.col("alarmDectionCode"),tmp.col("alarmDectionCode").show();

一些博客告诉我应该实现 java.io.Serialized,所以我尝试了,但得到了同样的错误,如下所示:

引起:java.io.NotSerializedException:jp.co.nec.necdas.commons.spark.SparkContextManager 序列化堆栈: - 对象不可序列化(类:jp.co.nec.necdas.commons.spark.SparkContextManager,值:jp.co.nec.necdas.commons.spark.SparkContextManager@153cfd86)

“SparkContextManager”是我的类中使用的 API,这是否意味着我必须确保我的类中使用的所有类都实现 java.io.Serializable?

最佳答案

您有两个选择:

  1. 不可序列化的对象应定义为静态,因为静态字段不是对象的一部分,并且它们不会被序列化。
  2. 在首次使用时实例化不可序列化对象,因此它不是发送到 Worker 的可序列化对象的一部分。

实际上,第二个选项是我在当前项目中使用的选项,其中我们的 UDF 必须使用我们无法修改的外部 jar。

关于java - org.apache.spark.SparkException : Task not serializable, 除实现 java.io.Serializable 之外的任何其他解决方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58428532/

相关文章:

java - 谁来帮帮我。用API重新编译后出现这个问题

java - Spring 启动和 ddl-auto 问题

java - Java 中的 SparkContext 并行化调用示例

apache-spark - 如何处理 yarn 客户端中运行时间过长的任务(与其他工作相比)?

java - 这段代码编译错误

java - 在 Cassandra java 驱动程序中获取 Select 查询的 LIMIT 值

java - 如何累积运行 Spark sql 聚合器?

python-3.x - ModuleNotFoundError : No module named 'py4j'

java - 创建 JavaSparkContext 时出现 NoClassDefFoundError

java - 如何在 JFreeChart 中的数 Axis 域上旋转刻度线标签?