multithreading - Pyspark并行ml.KMeans覆盖彼此的K

标签 multithreading machine-learning concurrency pyspark k-means

我按照这篇文章并行运行 KMeans。我在 EMR 上使用了 Python 2.7 和 Spark 2.0.2。

How to run multiple jobs in one Sparkcontext from separate threads in PySpark?

正如帖子中所引用的,从不同进程提交的作业不应相互影响。

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)." http://spark.apache.org/docs/latest/job-scheduling.html

但是,生成的模型的簇数 K 与传入的不同。

代码:

from pyspark.ml.clustering import KMeans
from sklearn.datasets.samples_generator import make_blobs
from pyspark.ml.linalg import Vectors
import random
random.seed(1)

group_size = 30
n_groups = 20

n_samples= n_groups * group_size
n_features=2
n_centers=4

xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
x_groups = []
for i in range(n_groups):
    x_groups.append(xs[i*group_size: (i+1)*group_size])


def do_kmean(xs):
    data = []
    for x in xs:
        data.append((Vectors.dense(x.tolist()),) )
    df = spark.createDataFrame(data, ["features"])

    num_clusters = random.randint(5,10)
    kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")
    model = kmeans.fit(df)
    return [num_clusters, kmeans.getK()]

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=8)

result = tpool.map(do_kmean, x_groups)

结果:(输入 K 与 KMeans 实际使用的值)

[[5, 9],
 [8, 9],
 [6, 8],
 [10, 9],
 [7, 9],
 [9, 9],
 [7, 9],
 [9, 9],
 [5, 5],
 [5, 9],
 [9, 7],
 [9, 9],
 [5, 7],
 [10, 5],
 [7, 7],
 [7, 7],
 [6, 6],
 [10, 10],
 [10, 10],
 [5, 5]]

Spark 似乎不是线程/进程安全的,并且正在访问其他进程的 K 副本。任何 Spark 配置是否会导致此问题,或者这是 Spark 错误吗?

最佳答案

这确实是 Spark 2.0.2 和 2.1.0 的一个错误。我能够使用上述两个版本在本地计算机上复制该错误。 Spark 2.1.1 修复了该错误。

https://issues.apache.org/jira/browse/SPARK-19348

关于multithreading - Pyspark并行ml.KMeans覆盖彼此的K,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44275173/

相关文章:

c - 主进程 -> pthread -> fork + execvp

java - Libsvm java训练测试示例(也是实时的)

java - 两个线程写入同一个文件

新手Java——死锁模仿

java - 一个单独的线程可以更改静态变量吗?

java - Android中可以发生优先级倒置吗

email - 如何提取没有签名或引用文本的电子邮件正文

python - 分类中的目标变量是否需要数值编码?

java - 为什么intellij中的所有程序都会导致找不到类错误

java - RxJava - 调度程序与 ExecutorService?