apache-spark - Apache Spark 中的非线性 (DAG) ML 管道

标签 apache-spark apache-spark-mllib apache-spark-ml

我已经设置了一个简单的 Spark-ML 应用程序,其中我有一个独立的转换器管道,可以将列添加到原始数据的数据帧中。由于变压器不查看彼此的输出,我希望我可以在非线性 (DAG) 管道中并行运行它们。关于此功能,我只能找到 the Spark ML-Guide 中的这一段。 :

It is possible to create non-linear Pipelines as long as the data flow graph forms a Directed Acyclic Graph (DAG). This graph is currently specified implicitly based on the input and output column names of each stage (generally specified as parameters). If the Pipeline forms a DAG, then the stages must be specified in topological order.



我对这一段的理解是,如果我为每个转换器设置 inputCol(s)、outputCol 参数并在创建管道时按拓扑顺序指定阶段,那么引擎将使用该信息来构建执行 DAG s.t.一旦输入准备好,DAG 的各个阶段就可以运行。

关于这个的一些问题:
  • 我的理解正确吗?
  • 如果对于其中一个阶段/变压器我没有指定输出列(例如,该阶段仅过滤某些行)会发生什么?它会假设出于 DAG 创建目的,该阶段正在更改所有列,因此所有后续阶段都应该等待它吗?
  • 同样,如果对于其中一个阶段我没有指定 inputCol(s),会发生什么?阶段会等到所有之前的阶段都完成吗?
  • 看来我可以指定多个输入列,但只能指定一个输出列。如果转换器将两列添加到数据框中会发生什么(Spark 本身对此没有问题)?有没有办法让 DAG 创建引擎知道呢?
  • 最佳答案

    Is my understanding correct?



    不完全是。因为阶段是按拓扑顺序提供的,所以要以正确的顺序遍历图形,您只需应用 PipelineStages从左到右。这正是您调用 PipelineTransform 时发生的情况。 .

    阶段序列被遍历两次:
  • 使用 transformSchema 验证架构一次which is simply implemented as stages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur)) .这是执行实际模式验证的部分。
  • 一次以适合使用 Transformers 的实际转换数据适合Estimators .这只是a simple for loop which applies stages sequentially one by one .

  • Likewise, what happens if for one of the stages I don't specify an inputCol(s)?



    几乎没有什么有趣的。由于阶段是按顺序应用的,并且给定的 Transformer 应用了唯一的模式验证。使用它的transformSchema方法在实际转换开始之前,它将像任何其他阶段一样处理。

    What happens if a transformer adds two columns to a dataframe



    和上面一样。只要它为后续阶段生成有效的输入模式,它就与任何其他 Transformer 没有区别.

    transformers don't look at the output of one another I was hoping I could run them in parallel



    从理论上讲,您可以尝试构建一个自定义复合转换器,它封装了多个不同的转换,但唯一可以独立执行并从这种类型的操作中受益的部分是模型拟合。在一天结束时,您必须返回一个转换后的 DataFrame可以由下游阶段使用,并且实际转换很可能无论如何都安排为单个数据扫描。

    如果真的值得付出努力,问题仍然存在。虽然可以同时执行多个作业,但它仅提供一些优势,如果可用资源量与处理单个作业所需的工作量相比相对较高。它通常需要一些低级管理(分区数、shuffle 分区数),这不是 Spark SQL 的强项。

    关于apache-spark - Apache Spark 中的非线性 (DAG) ML 管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37541668/

    相关文章:

    sql-server - 我想将数据从 SQL Server DB 移动到 Hbase/Cassandra 等。如何决定使用哪个大数据数据库?

    scala - 卡夫卡+ Spark 流: Multi topic processing in single job

    scala - 获取 TrainValidationSplit scala 的最佳参数

    scala - 如何在scala中保存RandomForestClassifier Spark模型?

    apache-spark - Pyspark 2.0 - IndextoString 错误

    apache-spark - 如何将 DataFrame 中的数据准备为 LibSVM 格式?

    apache-spark - 如何在 spark-graphx 中获得两跳邻居?

    scala - Spark Redis连接器可将数据写入Redis的特定索引

    java - Apache Spark : StackOverflowError when trying to indexing string columns

    scala - 如何在朴素贝叶斯模型的 BinaryClassificationMetrics 评估中给出预测列和标签列