java - 自定义 Transformer 中的 Spark (Java) transformSchema()

标签 java apache-spark apache-spark-sql pipeline apache-spark-ml

我想将我的自定义转换器与 StandardScaler 一起使用:

VectorizerTransformer vectorizerTransformer = new VectorizerTransformer(field.getName());
                pipelineStages.add(vectorizerTransformer);
                StandardScaler scaler = new StandardScaler()
                        .setInputCol(vectorizerTransformer.getOutputColumn())
                        .setOutputCol(field.getName() + "_norm")
                        .setWithStd(true)
                        .setWithMean(true);
                pipelineStages.add(scaler);

但是,当我运行时:

PipelineModel pipelineModel = pipeline.fit(dframe);

我遇到了一个异常(exception):

Exception in thread "main" java.lang.IllegalArgumentException: Field "trans_vector" does not exist.
at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:228)
at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:228)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.types.StructType.apply(StructType.scala:227)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
at org.apache.spark.ml.feature.StandardScalerParams$class.validateAndTransformSchema(StandardScaler.scala:68)
at org.apache.spark.ml.feature.StandardScaler.validateAndTransformSchema(StandardScaler.scala:88)
at org.apache.spark.ml.feature.StandardScaler.transformSchema(StandardScaler.scala:124)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:180)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:180)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:180)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:70)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:132)
at org.sparkexample.PipelineExample.main(PipelineExample.java:90)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

其中字段的名称是 VectorizerTransformer 的输出字段。

在 VectorizerTransformer 中我有代码:

@Override
public StructType transformSchema(StructType arg0) {
    return arg0;
}

我相信问题出在这里。所以我需要在那里写点东西,但具体是什么?我只是向数据框添加新字段

trans -> trans_vector

最佳答案

@Override
public StructType transformSchema(StructType structType) {
    return structType.add(getOutputColumn(),new VectorUDT(),true);
}

就是这么简单。

注意:我使用了http://supunsetunga.blogspot.ru/2016/05/custom-transformers-for-spark.html作为 java 转换器的代码。

关于java - 自定义 Transformer 中的 Spark (Java) transformSchema(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40267377/

相关文章:

apache-spark - 如何在 Spark 集群上运行 spring boot 应用程序

scala - Spark : how to create a row with fields name

java - 枚举:每个实例独有的方法

java - Swagger 引用对 Spring 对象的响应,使用 Swagger 进行分页

java - 如何从另一个文件导入 Tomcat Server.xml 的 Host 指令?

scala - 如何从 spark 中的输出控制台抑制 "Stage 2===>"?

hadoop - 当 aws emr 核心节点在 hadoop 环境中死亡时会发生什么

java - 如何在 Apache Spark 中为两个具有不同结构的 DataFrame 实现 NOT IN

apache-spark - Pyspark - 圆时间表示为最接近刻钟(15 分钟)的整数

java - 计算器 Android 应用程序中的乘法在输出中抛出 Alphabeta