scala - 如何使用 foreachPartition 在 Spark 中为每个分区高效构建一个 ML 模型?

标签 scala apache-spark apache-spark-ml

我正在尝试为我的数据集的每个分区拟合一个 ML 模型,但我不知道如何在 Spark 中执行此操作。

我的数据集基本上看起来像这样并且按公司划分:

Company | Features | Target

A         xxx        0.9
A         xxx        0.8
A         xxx        1.0
B         xxx        1.2
B         xxx        1.0
B         xxx        0.9
C         xxx        0.7
C         xxx        0.9
C         xxx        0.9

我的目标是以并行方式为每家公司训练一个回归器(我有几亿条记录,有 10 万家公司)。 我的直觉是我需要使用 foreachPartition 来并行处理分区(即我的公司)并训练和保存每个公司模型。 我的主要问题是如何处理 foreachPartition 调用的函数中要使用的 iterator 类型。

这是它的样子:

dd.foreachPartition(

    iterator => {var company_df = operator.toDF()
                 var rg = RandomForestRegressor()
                                 .setLabelCol("target")
                                 .setFeaturesCol("features")
                                 .setNumTrees(10)
                 var model = rg.fit(company_df)
                 model.write.save(company_path)
                 }
)

据我了解,尝试将 iterator 转换为 dataframe 是不可能的,因为 RDD 的概念本身不能存在于 foreachPartition 中 语句。

我知道这个问题很开放,但我真的卡住了。

最佳答案

在 pyspark 中你可以做如下的事情

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)

关于scala - 如何使用 foreachPartition 在 Spark 中为每个分区高效构建一个 ML 模型?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58592081/

相关文章:

python - 查找 Pyspark 中两个日期之间的周末天数

python - 将数据从 Dataframe 传递到现有 ML VectorIndexerModel 时出错

apache-spark - 如何将命名参数发送到 spark-submit

java - Spark : replace null values in dataframe with mean of column

python - 随机森林分类器 - 将索引标签标记转换回字符串值

apache-spark - PySpark PCA : how to convert dataframe rows from multiple columns to a single column DenseVector?

scala - 可变参数的默认空大小写

scala - Scala 类型类模式中隐式定义的运行时成本

scala - 在scala中解析csv文件

regex - 删除 Spark 中 RDD 行中的连续空格