我按照这篇文章并行运行 KMeans。我在 EMR 上使用了 Python 2.7 和 Spark 2.0.2。
How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
正如帖子中所引用的,从不同进程提交的作业不应相互影响。
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)." http://spark.apache.org/docs/latest/job-scheduling.html
但是,生成的模型的簇数 K 与传入的不同。
代码:
from pyspark.ml.clustering import KMeans
from sklearn.datasets.samples_generator import make_blobs
from pyspark.ml.linalg import Vectors
import random
random.seed(1)
group_size = 30
n_groups = 20
n_samples= n_groups * group_size
n_features=2
n_centers=4
xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
x_groups = []
for i in range(n_groups):
x_groups.append(xs[i*group_size: (i+1)*group_size])
def do_kmean(xs):
data = []
for x in xs:
data.append((Vectors.dense(x.tolist()),) )
df = spark.createDataFrame(data, ["features"])
num_clusters = random.randint(5,10)
kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(df)
return [num_clusters, kmeans.getK()]
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=8)
result = tpool.map(do_kmean, x_groups)
结果:(输入 K 与 KMeans 实际使用的值)
[[5, 9],
[8, 9],
[6, 8],
[10, 9],
[7, 9],
[9, 9],
[7, 9],
[9, 9],
[5, 5],
[5, 9],
[9, 7],
[9, 9],
[5, 7],
[10, 5],
[7, 7],
[7, 7],
[6, 6],
[10, 10],
[10, 10],
[5, 5]]
Spark 似乎不是线程/进程安全的,并且正在访问其他进程的 K 副本。任何 Spark 配置是否会导致此问题,或者这是 Spark 错误吗?
最佳答案
这确实是 Spark 2.0.2 和 2.1.0 的一个错误。我能够使用上述两个版本在本地计算机上复制该错误。 Spark 2.1.1 修复了该错误。
关于multithreading - Pyspark并行ml.KMeans覆盖彼此的K,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44275173/