python - Apache Spark 和 scikit_learn 之间的 KMeans 结果不一致

标签 python apache-spark scikit-learn pyspark k-means

我正在使用 PySpark 对数据集执行聚类。为了找到聚类的数量,我在一系列值 (2,20) 上执行了聚类,并为 k 的每个值找到了 wsse(簇内平方和)值>。在这里我发现了一些不寻常的东西。根据我的理解,当你增加集群的数量时,wsse 单调减少。但我得到的结果却不是这样。我只显示前几个集群的 wsse

Results from spark

For k = 002 WSSE is 255318.793358
For k = 003 WSSE is 209788.479560
For k = 004 WSSE is 208498.351074
For k = 005 WSSE is 142573.272672
For k = 006 WSSE is 154419.027612
For k = 007 WSSE is 115092.404604
For k = 008 WSSE is 104753.205635
For k = 009 WSSE is 98000.985547
For k = 010 WSSE is 95134.137071

如果您查看 k=5k=6wsse 值,您将看到 wsse 增加了。我转向 sklearn 看看我是否得到了类似的结果。我用于 spark 和 sklearn 的代码位于帖子末尾的附录部分。我尝试对 spark 和 sklearn KMeans 模型中的参数使用相同的值。以下是 sklearn 的结果,它们正如我预期的那样 - 单调递减。

Results from sklearn

For k = 002 WSSE is 245090.224247
For k = 003 WSSE is 201329.888159
For k = 004 WSSE is 166889.044195
For k = 005 WSSE is 142576.895154
For k = 006 WSSE is 123882.070776
For k = 007 WSSE is 112496.692455
For k = 008 WSSE is 102806.001664
For k = 009 WSSE is 95279.837212
For k = 010 WSSE is 89303.574467

我不确定为什么我的 wsse 值会在 Spark 中增加。我尝试使用不同的数据集,也发现了类似的行为。有什么地方我出错了吗?任何线索都会很棒。


附录

数据集位于 here .

读取数据并设置声明变量

# get data
import pandas as pd
url = "https://raw.githubusercontent.com/vectosaurus/bb_lite/master/3.0%20data/adult_comp_cont.csv"

df_pandas = pd.read_csv(url)
df_spark = sqlContext(df_pandas)
target_col = 'high_income'
numeric_cols = [i for i in df_pandas.columns if i !=target_col]

k_min = 2 # 2 in inclusive
k_max = 21 # 2i is exlusive. will fit till 20

max_iter = 1000
seed = 42    

这是我用来获取 sklearn 结果的代码:

from sklearn.cluster import KMeans as KMeans_SKL
from sklearn.preprocessing import StandardScaler as StandardScaler_SKL

ss = StandardScaler_SKL(with_std=True, with_mean=True)
ss.fit(df_pandas.loc[:, numeric_cols])
df_pandas_scaled = pd.DataFrame(ss.transform(df_pandas.loc[:, numeric_cols]))

wsse_collect = []

for i in range(k_min, k_max):
    km = KMeans_SKL(random_state=seed, max_iter=max_iter, n_clusters=i)
    _ = km.fit(df_pandas_scaled)
    wsse = km.inertia_
    print('For k = {i:03d} WSSE is {wsse:10f}'.format(i=i, wsse=wsse))
    wsse_collect.append(wsse)

这是我用来获取 spark 结果的代码

from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.clustering import KMeans

standard_scaler_inpt_features = 'ss_features'
kmeans_input_features = 'features'
kmeans_prediction_features = 'prediction'


assembler = VectorAssembler(inputCols=numeric_cols, outputCol=standard_scaler_inpt_features)
assembled_df = assembler.transform(df_spark)

scaler = StandardScaler(inputCol=standard_scaler_inpt_features, outputCol=kmeans_input_features, withStd=True, withMean=True)
scaler_model = scaler.fit(assembled_df)
scaled_data = scaler_model.transform(assembled_df)

wsse_collect_spark = []

for i in range(k_min, k_max):
    km = KMeans(featuresCol=kmeans_input_features, predictionCol=kmeans_prediction_col,
                        k=i, maxIter=max_iter, seed=seed)
    km_fit = km.fit(scaled_data)
    wsse_spark = km_fit.computeCost(scaled_data)
    wsse_collect_spark .append(wsse_spark)
    print('For k = {i:03d} WSSE is {wsse:10f}'.format(i=i, wsse=wsse_spark))

更新

按照@Michail N 的回答,我更改了 Spark KMeans 模型的 tolmaxIter 值。我重新运行了代码,但我看到相同的行为在重复。但是自从 Michail 提到

Spark MLlib, in fact, implements K-means||

我将 initSteps 的数量增加了 50 倍,然后重新运行该过程,结果如下。

For k = 002 WSSE is 255318.718684
For k = 003 WSSE is 212364.906298
For k = 004 WSSE is 185999.709027
For k = 005 WSSE is 168616.028321                                               
For k = 006 WSSE is 123879.449228                                               
For k = 007 WSSE is 113646.930680                                               
For k = 008 WSSE is 102803.889178                                               
For k = 009 WSSE is 97819.497501                                                
For k = 010 WSSE is 99973.198132                                                
For k = 011 WSSE is 89103.510831                                                
For k = 012 WSSE is 84462.110744                                                
For k = 013 WSSE is 78803.619605                                                
For k = 014 WSSE is 82174.640611                                                
For k = 015 WSSE is 79157.287447                                                
For k = 016 WSSE is 75007.269644                                                
For k = 017 WSSE is 71610.292172                                                
For k = 018 WSSE is 68706.739299                                                
For k = 019 WSSE is 65440.906151                                                
For k = 020 WSSE is 66396.106118

wssek=5k=6的增加消失了。尽管如果您查看 k=13k=14 以及其他地方,该行为仍然存在,但至少我知道这是从哪里来的。

最佳答案

WSSE 没有单调递减没有错。理论上,如果集群是最优的,WSSE 必须单调递减,这意味着从所有可能的 k 中心集群中,具有最佳 WSSE 的集群。

问题是K-means不一定能找到最优聚类 对于给定的 k。其迭代过程可以从一个随机起点收敛到 局部最小值,这可能很好但不是最优的。

有像K-means++这样的方法和 Kmeans||具有选择算法的变体,更有可能选择不同的、分离的质心,并更可靠地导致良好的聚类和 Spark MLlib,事实上,实现了 K-means||。但是,所有选择仍然具有随机性,不能保证最佳聚类。

为 k=6 选择的随机起始聚类集可能会导致特别次优的聚类,或者它可能在达到局部最优值之前就提前停止了。

您可以通过更改 parameters of Kmeans 来改进它手动。该算法通过 tol 有一个阈值,它控制被认为重要的簇质心移动的最小量,其中较低的值意味着 K-means 算法将让质心继续移动更长时间。

使用 maxIter 增加最大迭代次数也可以防止它以可能更多的计算为代价而过早停止。

所以我的建议是重新运行集群

 ...
 #increase from default 20
 max_iter= 40     
 #decrase from default 0.0001
 tol = 0.00001 
 km = KMeans(featuresCol=kmeans_input_features, predictionCol=kmeans_prediction_col, k=i, maxIter=max_iter, seed=seed , tol = tol )
 ...

关于python - Apache Spark 和 scikit_learn 之间的 KMeans 结果不一致,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50406096/

相关文章:

python - 在一个脚本中更新列表并从另一个脚本访问更新的列表

python - 尝试根据输入创建一个简单的按属性选择的脚本

scala - Scala : compilation error: not found type

apache-spark - 使用 Spark.sql 插入 TempView

python - 使用 FeatureUnion 拟合管道时出现 IndexError

python - scikit learn安装难度

python - 在 AWS Sagemaker 中为 scikit learn 模型创建端点

python - 如何处理 GRequests 中的错误?

python - 如何判断 apply_async 函数是否已启动或它是否仍在 multiprocessing.Pool 队列中

mongodb - 从 Spark 2.2.0 连接到 MongoDB 时出现问题