我目前有一个包含 2 列的 Spark Dataframe: 1) 每行包含预测特征向量的列 2) 包含要预测的值的列。
为了辨别在以后的模型中使用的最具预测性的特征,我使用 P 值进行向后消除,如this article所述。 。下面是我的代码:
num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
for i in range(0, num_vars):
model = LinearRegression(featuresCol="filtered_features", labelCol="averageScore")
model = model.fit(scoresDf)
p_values = model.summary.pValues
max_p = np.max(p_values)
if max_p > 0.05:
max_index = p_values.index(max_p)
drop_max_index_udf = udf(lambda elem, drop_index, var_count:
Vectors.dense([elem[j] for j in range(var_count) if j not in [drop_index]]), VectorUDT())
scoresDfs = scoresDf.withColumn("filtered_features", drop_max_index_udf(scoresDf["filtered_features"],
lit(max_index), lit(num_vars)))
num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
代码可以运行,但唯一的问题是每次迭代所花费的时间都比上一次要长得多。基于this question的答案,看起来代码每次都会重新评估所有先前的迭代。
理想情况下,我想将整个逻辑输入到某个管道结构中,该结构将延迟存储所有内容,然后在调用时按顺序执行而不会重复,但我不确定这是否可能,因为 Spark 中没有一个估计器/变压器函数似乎适合这个用例。
任何指导将不胜感激,谢谢!
最佳答案
您正在循环内重复创建模型。这是一个耗时的过程,需要每个训练数据集和一组参数完成一次。尝试以下操作 -
num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
modelAlgo = LinearRegression(featuresCol="filtered_features", labelCol="averageScore")
model = modelAlgo.fit(scoresDf)
for i in range(0, num_vars):
p_values = model.summary.pValues
max_p = np.max(p_values)
if max_p > 0.05:
max_index = p_values.index(max_p)
drop_max_index_udf = udf(lambda elem, drop_index, var_count:
Vectors.dense([elem[j] for j in range(var_count) if j not in [drop_index]]), VectorUDT())
scoresDfs = scoresDf.withColumn("filtered_features", drop_max_index_udf(scoresDf["filtered_features"],
lit(max_index), lit(num_vars)))
num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
一旦您对模型感到满意,就可以保存它。当您需要评估数据时,只需阅读此模型并用它进行预测即可。
关于apache-spark - Spark : Running Backwards Elimination By P-Value With Linear Regressions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59603950/