我想在 scala 的 spark 2.0 中为管道编写自定义 Transformer
。到目前为止,我还不清楚 copy
或 transformSchema
方法应该返回什么。他们返回一个 null
是否正确? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java复制?
由于 Transformer
扩展了 PipelineStage
我得出结论,fit
调用了 transformSchema
方法。我是否正确理解 transformSchema
类似于 sk-learns fit?
因为我的 Transformer
应该将数据集与(非常小的)第二个数据集连接起来,所以我也想将该数据集存储在序列化管道中。我应该如何将其存储在转换器中以正确使用管道序列化机制?
一个简单的转换器会是什么样子,它计算单个列的平均值并填充 nan 值 + 保留该值?
@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {
def transform(df: Dataset[MyClass]): DataFrame = {
}
override def copy(extra: ParamMap): Transformer = {
}
override def transformSchema(schema: StructType): StructType = {
schema
}
}
最佳答案
transformSchema
应返回应用 Transformer
后预期的架构。示例:
如果transfomer添加
IntegerType
列,输出列名是foo
:import org.apache.spark.sql.types._ override def transformSchema(schema: StructType): StructType = { schema.add(StructField("foo", IntegerType)) }
So if the schema is not changed for the dataset as only a name value is filled for mean imputation I should return the original case class as the schema?
这在 Spark SQL(和 MLlib,也是)中是不可能的,因为 Dataset
一旦创建就不可变。您只能添加或“替换”(添加后跟 drop
操作)列。
关于scala - 如何在 MLlib 中编写自定义 Transformer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40615713/