scala - Spark 的分层聚合集群

标签 scala apache-spark hierarchical-clustering

我正在使用 Spark 和 Scala 开发一个项目,我正在寻找一种分层聚类算法,该算法类似于 scipy.cluster.hierarchy.fcluster 或 sklearn.cluster.AgglomerativeClustering,可用于大量数据.

MLlib for Spark 实现 Bisecting k-means,它需要集群数量作为输入。不幸的是,就我而言,我不知道簇的数量,我更愿意使用一些距离阈值作为输入参数,因为它可以在上面的两个 python 实现中使用。

如果有人知道答案,我将不胜感激。

最佳答案

所以我遇到了同样的问题,经过一番查找后没有找到答案,所以我将在这里发布我所做的事情,希望它可以帮助其他人,也许有人会在此基础上继续发展。

我所做的基本思想是递归地使用平分 K 均值继续将簇一分为二,直到簇中的所有点距质心指定的距离。我正在使用 GPS 数据,因此我有一些额外的机器来处理它。

第一步是创建一个模型,将数据减半。我使用了平分 K 均值,但我认为这适用于任何 pyspark 聚类方法,只要您可以获得到质心的距离。

import pyspark.sql.functions as f
from pyspark import SparkContext, SQLContext
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


bkm = BisectingKMeans().setK(2).setSeed(1)
assembler = VectorAssembler(inputCols=['lat','long'], outputCol="features")
adf = assembler.transform(locAggDf)#locAggDf contains my location info
model = bkm.fit(adf)
# predictions will have the original data plus the "features" col which assigns a cluster number
predictions = model.transform(adf)
predictions.persist()

下一步是我们的递归函数。这里的想法是,我们指定距质心的一定距离,如果簇中的任何点比该距离更远,我们会将簇切成两半。当集群足够紧密以至于满足条件时,我将其添加到用于构建最终集群的结果数组中

def bisectToDist(model, predictions, bkm, precision, result = []):
    centers = model.clusterCenters()
    # row[0] is predictedClusterNum, row[1] is unit, row[2] point lat, row[3] point long
    # centers[row[0]] is the lat long of center, centers[row[0]][0] = lat, centers[row[0]][1] = long
    distUdf = f.udf(
        lambda row: getDistWrapper((centers[row[0]][0], centers[row[0]][1], row[1]), (row[2], row[3], row[1])),
        FloatType())##getDistWrapper(is how I calculate the distance of lat and long but you can define any distance metric)
    predictions = predictions.withColumn('dist', distUdf(
        f.struct(predictions.prediction, predictions.encodedPrecisionUnit, predictions.lat, predictions.long)))
    #create a df of all rows that were in clusters that had a point outside of the threshold
    toBig = predictions.join(
        predictions.groupby('prediction').agg({"dist": "max"}).filter(f.col('max(dist)') > self.precision).select(
            'prediction'), ['prediction'], 'leftsemi')


    #this could probably be improved
    #get all cluster numbers that were to big
    listids = toBig.select("prediction").distinct().rdd.flatMap(lambda x: x).collect()

    #if all data points are within the speficed distance of the centroid we can return the clustering
    if len(listids) == 0:
        return predictions

    # assuming binary class now k must be = 2
    # if one of the two clusters was small enough we will not have another recusion call for that cluster
    # we must save it and return it at this depth the clustiering that was 2 big will be cut in half in the loop below
    if len(listids) == 1:
        ok = predictions.join(
            predictions.groupby('prediction').agg({"dist": "max"}).filter(
                f.col('max(dist)') <= precision).select(
                'prediction'), ['prediction'], 'leftsemi')


    for clusterId in listids:
        # get all of the pieces that were to big
        part = toBig.filter(toBig.prediction == clusterId)
        
        # we now deed to refit the subset of the data
        assembler = VectorAssembler(inputCols=['lat', 'long'], outputCol="features")
        adf = assembler.transform(part.drop('prediction').drop('features').drop('dist'))
        model = bkm.fit(adf)
        #predictions now holds the new subclustering and we are ready for recursion
        predictions = model.transform(adf)
        result.append(bisectToDist(model, predictions, bkm, result=result))

    #return anything that was given and already good

    if len(listids) == 1:
        return ok

最后我们可以调用该函数并构建结果数据框

result = []
self.bisectToDist(model, predictions, bkm, result=result)
#drop any nones can happen in recursive not top level call
result =[r for r in result if r]


r = result[0]
r = r.withColumn('subIdx',f.lit(0))
result = result[1:]
idx = 1
for r1 in result:
    r1 = r1.withColumn('subIdx',f.lit(idx))
    r = r.unionByName(r1)
    idx = idx + 1

# each of the subclusters will have a 0 or 1 classification in order to make it 0 - n I added the following
r = r.withColumn('delta', r.subIdx * 100 + r.prediction)
r = r.withColumn('delta', r.delta - f.lag(r.delta, 1).over(Window.orderBy("delta"))).fillna(0)
r = r.withColumn('ddelta', f.when(r.delta != 0,1).otherwise(0))
r = r.withColumn('spacialLocNum',f.sum('ddelta').over(Window.orderBy(['subIdx','prediction'])))
#spacialLocNum should be the final clustering 

诚然,这相当复杂且缓慢,但它确实完成了工作,希望这有帮助!

关于scala - Spark 的分层聚合集群,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63344952/

相关文章:

python - 来自 Spark 数据帧的 block topandas

hadoop - Google Cloud Dataproc - Spark 和 Hadoop 版本

machine-learning - ELKI 层次聚类 - "mrg_"Cluster 对象

Scala运行时的安全性

scala - Akka 和 Play on Heroku : Should I have my actors on seperate dyno or in the web dyno?

scala - 可以在 Scala HList 上执行 Map 吗

java - 从 Java 调用 Scala 方法 - Set<Object> 还是 Set<Long>?

hadoop - 具有 Parquet 数据的 Hive 外部表未选择数据

algorithm - 位序列的层次聚类

cluster-analysis - 对推文聚类方法的质疑