python - Spark 程序在独立集群上运行时给出奇怪的结果

标签 python apache-spark pyspark bigdata

我有这个 spark 程序,我会尽量将它限制在相关部分

# Split by delimiter ,
# If the file is in unicode, we need to convert each value to a float in order to be able to 
# treat it as a number
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist()

# start with K randomly selected points from the dataset
# A centroid cannot be an actual data point or else the distance measure between a point and 
# that centroid will be zero. This leads to an undefined membership value into that centroid.
centroids = points.takeSample(False, K, 34)
#print centroids
# Initialize our new centroids
newCentroids = [[] for k in range(K)]
tempCentroids = []
for centroid in centroids:
    tempCentroids.append([centroid[N] + 0.5])
#centroids = sc.broadcast(tempCentroids)

convergence = False

ncm = NCM()

while(not convergence):
    memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m)))
    cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value)))
    # Memberships
    T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)))
    I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2,  weight3, c)[0]))
    F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1,  weight2, weight3, c)[0]))
    # Components of new centroids
    wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t))))
    #print "wTm = " + str(wTm.collect())
    print "at first reduce"
    sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2))
    #print "sumwTm = " + str(sumwTm.collect())
    wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t))))
    print "adding to cnumerator list"
    #print wTmx.collect()
    cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values()
    print "collected cnumerator, now printing"    
    #print "cnumerator = " + str(cnumerator.collect())
    #print str(sumwTm.collect())
    # Calculate the new centroids
    sumwTmCollection = sumwTm.collect()[0][1]
    cnumeratorCollection = cnumerator.collect()
    #print "sumwTmCollection = " + str(sumwTmCollection)
    #cnumeratorCollection =cnumerator.collectAsMap().get(0).items
    print "cnumeratorCollection = " + str(cnumeratorCollection)
    for i in range(len(newCentroids)):
        newCentroids[i] = scalarMult(1 / sumwTmCollection[i], [cnumeratorCollection[i]])
    centroids = newCentroids
    # Test for convergence
    convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon)

    #convergence = True 
    # Replace our old centroids with the newly found centroids and repeat if convergence not met
    # Clear out space for a new set of centroids
    newCentroids = [[] for k in range(K)]

该程序在我的本地机器上运行良好,但是,在独立集群上运行时,它的行为不如预期。它不一定会引发错误,但它的作用是提供与我在本地计算机上运行时收到的不同的输出。集群和 3 个节点似乎工作正常。我感觉问题在于我不断更新 centroids,这是一个 python 列表,每次通过 while-loop 都会更改。是否有可能每个节点都没有该列表的最新副本?我认为是这样,所以我尝试使用 broadcast variable 但这些变量无法更新(只读)。我也尝试过使用 accumulator 但这些只是为了累加。我还尝试将 python 列表保存为 hdfs 上的文件,以便每个节点都可以访问,但这效果不佳。你认为我对问题的理解正确吗?这里可能还会发生其他事情吗?如何获得在我的本地计算机上正常运行但在集群上运行不正常的代码?

最佳答案

感谢您花时间关注这个问题,特别是因为听起来我本可以发布更多信息来让您的工作更轻松。这里的问题是

centroids = points.takeSample(False, K, 34)

我没有意识到这一点,但经过短暂的实验后,这个函数每次都返回相同的输出,尽管我认为这是一个随机样本。只要您使用相同的种子(在本例中为 34),您就会得到相同的 RDD 作为返回。由于某种原因,我集群上的 RDD 与返回到本地计算机的 RDD 不同。无论如何,因为每次都是相同的 RDD,所以我的输出永远不会改变。返回给我的“随机”质心的问题是,这些特殊的质心产生了类似于数学中的鞍点的东西,在那里不会发现质心的收敛。这部分答案是数学的和编程的,所以我不会再提了。在这一点上,我真正的希望是,如果你愿意,其他人会得到帮助

centroids = points.takeSample(False, K, 34)

为了在每次调用时生成不同的样本,您每次都将种子更改为某个随机数。

我希望这对您有所帮助。我以前从来没有花这么多时间来解决我的内存问题。

再次感谢。

关于python - Spark 程序在独立集群上运行时给出奇怪的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36816934/

相关文章:

apache-spark - 如何释放 Dataproc 中 block 池使用的空间

machine-learning - Pyspark 中的过采样或 SMOTE

python - 比较字符串和张量

python - 如何在图像文件中保存浮点像素值

python - 在 python 中读取 15 M 行 csv 文件的有效方法

apache-spark - Spark Graphframes 大数据集和内存问题

apache-spark - 从 Spark 数据框中选择不同值的最有效方法是什么?

python - 如何在字典中以原始顺序返回键

apache-spark - Spark提交使用其他容器

scala - Spark : Replace Null value in a Nested column