scala - 了解 UID 在 Spark MLLib Transformer 中的作用

标签 scala apache-spark apache-spark-sql apache-spark-mllib apache-spark-ml

我正在使用 Apache Spark 和 Scala 创建一个 ML 管道。我的管道中的一个变形金刚做了一个昂贵的 join在这个过程的早期操作。由于我的 ParamGrid 中有很多功能这意味着节目要撑得住这个巨大的,加入DataFrame在内存中优化网格中的每个特征。

为了尝试解决这个问题,我创建了一个自定义 Transformer缓存这么大的中级 DataFrame通过将其写入 S3 中的 Parquet 并返回从 Parquet 读取的数据帧。这运行良好并提高了模型的速度,直到我向 ParamGrid 添加了功能在缓存阶段之前上演。当我将 Parquet 写入 S3 时,我使用由以下条件确定的路径:

class Cacher(override val uid: String) extends Transformer {

  // the cachePath variable determines the path within the S3 bucket
  lazy val cachePath = Identifiable.randomUID(uid + "transformer-cache")

  // ...  

我想我误解了 uid正在工作...我的信念是,每当 Spark 优化 ParamGrid ,它使用管道中那个点上演的任何类,创建它们的新实例,并为它们提供新的、独特的 uid s 来跟踪它们。我怀疑缓存出了问题,因为 Spark 没有给出唯一的 uid到新Transformer它创建的实例意味着每当缓存的新实例Transformer 时,缓存的 Parquet 都会被不断覆盖。被 build 。任何人都可以就如何生成唯一的随机提供任何指示uid s 对于管道创建的每个阶段实例?

干杯!

最佳答案

一步步:

  • uid Identifiable 要求trait( Transformer 延伸 PipelineStage 延伸 Params 延伸 Identifiable )。
  • 根据Identifiable文档 uid is :

    An immutable unique ID for the object and its derivatives.

  • 一般来说:
  • Params是可变的。设置参数返回this并且不影响 uid .

    import org.apache.spark.ml.feature.OneHotEncoder
    
    val enc = new OneHotEncoder()
    val enc_  = enc.setInputCol("foo")
    
    enc == enc_
    

    Boolean = true
    

    enc.uid == enc_.uid
    

    Boolean = true
    
  • copying Params创建一个新实例但保持不变 uid (请参阅上一点引用的强调部分)。

    val encCopy = enc.copy(new org.apache.spark.ml.param.ParamMap())
    
    encCopy == enc
    

    Boolean = false
    

    encCopy.uid == enc.uid
    

    Boolean = true
    
  • 您可以尝试覆盖 copy method避免 copying parent uid 但这似乎与制作 Params 背后的整个想法相冲突。 Identifiable .

  • 可能的解决方案:
  • 不要使用变压器 uid或使路径依赖于当前的参数集。
  • 不要手动写入缓存文件并使用内置缓存机制 ( Dataset.persist )。它不仅解决了手头的问题,还解决了退出时释放资源的隐藏问题。
  • 关于scala - 了解 UID 在 Spark MLLib Transformer 中的作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40874941/

    相关文章:

    scala - 如何在超时的情况下在 Scala 中执行操作?

    scala - Scala 中具有尾函数的最大元素

    java - 如何替换字符串类型列中的子字符串?

    apache-spark - 在 Spark 中压缩序列文件?

    scala - 过滤数据帧spark scala以查找大于当前时间的日期

    scala - 有条件的理解

    java - 通过 javaagent 拦截具有丰富参数的方法

    java - 无法从 Java 连接到 Kubernetes 中运行的 Spark

    python - Spark中groupBy的使用

    python - Pyspark 将列类型从日期更改为字符串