apache-spark - Apache Spark : Applying a function from sklearn parallel on partitions

标签 apache-spark

我是大数据和 Apache Spark 的新手(也是一名在导师指导下工作的本科生)。

是否可以将函数(即样条)仅应用于 RDD 的分区?我正在尝试实现论文 here 中的一些工作.

《Learning Spark》这本书似乎表明这是可能的,但没有解释如何实现。

"If you instead have many small datasets on which you want to train different learning models, it would be better to use a single- node learning library (e.g., Weka or SciKit-Learn) on each node, perhaps calling it in parallel across nodes using a Spark map()."

最佳答案

实际上,我们有一个库可以做到这一点。我们有几个 sklearn transformators 和 predictors 正在运行。它的名字是 sparkit-learn。
从我们的示例中:

from splearn.rdd import DictRDD  
from splearn.feature_extraction.text import SparkHashingVectorizer  
from splearn.feature_extraction.text import SparkTfidfTransformer  
from splearn.svm import SparkLinearSVC  
from splearn.pipeline import SparkPipeline  

from sklearn.feature_extraction.text import HashingVectorizer  
from sklearn.feature_extraction.text import TfidfTransformer  
from sklearn.svm import LinearSVC  
from sklearn.pipeline import Pipeline  

X = [...]  # list of texts  
y = [...]  # list of labels  
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
            columns=('X', 'y'),
            dtype=[np.ndarray, np.ndarray])

local_pipeline = Pipeline((
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', LinearSVC())
))
dist_pipeline = SparkPipeline((
    ('vect', SparkHashingVectorizer()),
    ('tfidf', SparkTfidfTransformer()),
    ('clf', SparkLinearSVC())
))

local_pipeline.fit(X, y)
dist_pipeline.fit(Z, clf__classes=np.unique(y))

y_pred_local = local_pipeline.predict(X)
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])

你可以找到它here .

关于apache-spark - Apache Spark : Applying a function from sklearn parallel on partitions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30653000/

相关文章:

hadoop - 在 spark yarn 集群中,容器如何工作取决于 RDD 分区的数量?

apache-spark - 从嵌套字典创建 Spark DataFrame

java - 如何为 Spark 流检查点设置检查点间隔?

scala - 在Spark中使用复杂过滤从Elasticsearch获取esJsonRDD

apache-spark - 为什么类型化 Dataset API 中不使用谓词下推(相对于非类型化 DataFrame API)?

ubuntu - sbt 包错误 : Couldn't retrieve source module: org. scala-sbt :compiler-interface:0. 13.16:component

apache-spark - 无法在Kubernetes POD上部署Spark历史记录服务器

python - 无法将 spark 数据框列与 df.withColumn() 合并

hadoop - 如何使用 Spark 创建 MapFile 并访问它?

apache-spark - Spark RDD union 非常慢