我有一个数据集,我想使用 Spark 和 Python 并行测试不同的分类器。 例如,如果我想测试决策树和随机森林,如何并行运行它们?
我尝试了几种方法,但我不断得到:
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我试图这样做(使用 scikit-learn 的分类器而不是 Spark 的分类器效果很好:
def apply_classifier(clf, train_dataset, test_dataset):
model = clf.fit(train_dataset)
predictions = model.transform(test_dataset)
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)
return [(model, predictions)]
...
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
classifiers = [dt, rf]
sc.parallelize(classifiers).flatMap(lambda x: apply_classifier(x, train_dataset, test_dataset)).collect()
关于如何做到这一点有什么建议吗?
谢谢!
最佳答案
@larissa-leite
为了克服这个问题,我正在使用[multiprocessing](https://docs.python.org/3/library/multiprocessing.html)
,如 thread 中所述。 .
这是线程的代码:
from multiprocessing import Process
def func1():
print 'func1: starting'
for i in xrange(10000000): pass
print 'func1: finishing'
def func2():
print 'func2: starting'
for i in xrange(10000000): pass
print 'func2: finishing'
if __name__ == '__main__':
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
请解释一下我为什么要使用它:我使用 OneVsRestClassifier 训练了多个文本分类器模型(超过 200 个)我需要将收到的文本扩展到每个模型。
这里的延迟不到 200 毫秒即可获得所有预测(人类的 the baseline time reaction 可能在 100 毫秒到 420 毫秒之间),所以这个“延迟”对我来说并不是什么大问题。
关于python - 使用 Spark 并行运行不同的分类器/算法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43207505/