scala - 如何在 MLlib 中编写自定义 Transformer?

标签 scala apache-spark apache-spark-sql apache-spark-mllib

我想在 scala 的 spark 2.0 中为管道编写自定义 Transformer。到目前为止,我还不清楚 copytransformSchema 方法应该返回什么。他们返回一个 null 是否正确? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java复制?

由于 Transformer 扩展了 PipelineStage 我得出结论,fit 调用了 transformSchema 方法。我是否正确理解 transformSchema 类似于 sk-learns fit?

因为我的 Transformer 应该将数据集与(非常小的)第二个数据集连接起来,所以我也想将该数据集存储在序列化管道中。我应该如何将其存储在转换器中以正确使用管道序列化机制?

一个简单的转换器会是什么样子,它计算单个列的平均值并填充 nan 值 + 保留该值?

@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
    class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {

      def transform(df: Dataset[MyClass]): DataFrame = {

      }

      override def copy(extra: ParamMap): Transformer = {
      }

      override def transformSchema(schema: StructType): StructType = {
        schema
      }
    }

最佳答案

transformSchema 应返回应用 Transformer 后预期的架构。示例:

  • 如果transfomer添加IntegerType列,输出列名是foo:

    import org.apache.spark.sql.types._
    
    override def transformSchema(schema: StructType): StructType = {
       schema.add(StructField("foo", IntegerType))
    }
    

So if the schema is not changed for the dataset as only a name value is filled for mean imputation I should return the original case class as the schema?

这在 Spark SQL(和 MLlib,也是)中是不可能的,因为 Dataset 一旦创建就不可变。您只能添加或“替换”(添加后跟 drop 操作)列。

关于scala - 如何在 MLlib 中编写自定义 Transformer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40615713/

相关文章:

Scala for-comprehension 语法

apache-spark - 从 Spark 数据框中选择不同值的最有效方法是什么?

postgresql - Pyspark 连接到 ipython 笔记本中的 Postgres 数据库

python - PySpark,通过 JSON 文件导入模式

scala - java.lang.NoClassDefFoundError : scala/runtime/LazyBoolean

java - Spark 数据集 - 读取 CSV 并写入空输出

scala - C++ typeid 的 Scala 等价物是什么?

python - Pyspark --py-files 不起作用

scala - Netty版本与Spark + Elasticsearch Transport冲突

scala - 基于 Scala 数组过滤或标记行