apache-spark - 酸洗错误-Cython 与 Pyspark : scikit-learn knn with user defined metric for large dataset

标签 apache-spark scikit-learn pyspark cython knn

我想使用 Cython 和 Pyspark 来加速 Sklearn knn 与用户定义的度量标准,用于具有 400000 行和 65 列的大型数据集。我已按照 here 中的说明进行操作和 here .我使用的是 Spark 版本 1.6.0 和 python 2.7.13

我为一个小样本数据集编写了以下代码,但出现以下酸洗错误

Traceback (most recent call last):
File "/farzanadata/main.py", line 26, in <module>
bc_nbrs = sc.broadcast(nbrs)
File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 741, in broadcast
File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 70, in __init__
File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 78, in dump
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

spark_tools.py
def spark_cython(module, method):
 def wrapped(*args, **kwargs):
    global cython_function_
    try:
        return cython_function_(*args, **kwargs)
    except:
        import pyximport
        pyximport.install()
        cython_function_ = getattr(__import__(module), method)
    return cython_function_(*args, **kwargs)
return wrapped

临床内核.pyx
cimport cython
from libc cimport math
cimport numpy as cnp
cnp.import_array()
def mydist(cnp.npy_double[:] x,cnp.npy_double[:] y):
  cdef double ranges[3]
  cdef int k
  cdef double out=0, out2=0
  ranges[:]=  [0.04028, 0.0983, 0.06602]
  for k in range(3):
      out += (ranges[k] - math.fabs(x[k] - y[k])) / ranges[k]
  for k in range(3,5):
      out2 += x[k]==y[k]       
  return (out+out2)/5

main.py
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from sklearn.neighbors import NearestNeighbors
import numpy as np
from spark_tools import spark_cython

import pyximport

conf = SparkConf().setAppName('Fibo')
sc = SparkContext(conf=conf)
sc.addFile('file:///farzanadata/clinical_kernel.pyx')
sc.addFile('file:///farzanadata/spark_tools.py')
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
pyximport.install()
import clinical_kernel
df=sc.parallelize([[0.72694,1.4742,0.32396,1,1],[0.74173,1.5257,0.36116,0,0],[0.76722,1.5725,0.38998,1,0],[0.76722, 1.5725, 0.38998,0,1]])
X=np.array(df.collect())
mapper = spark_cython('clinical_kernel', 'mydist')
nbrs=NearestNeighbors(n_neighbors=4,metric=mapper)
nbrs.fit(X)
bc_nbrs = sc.broadcast(nbrs)
neighbors=df.map(lambda x: bc_nbrs.value.kneighbors(x,n_neighbors=4,return_distance=False))
neigh_df = neighbors.map(lambda x: x.tolist()).toDF(["neighbors"])
neigh_df.show()

而不是广播 KNN 树,使用以下代码可以完美地工作,当然这对于大型数据集并不理想。
neighbors=nbrs.kneighbors(X,n_neighbors=4,return_distance=False)

使用进口莳萝作为泡菜也没有帮助

最佳答案

按照以下方式更改 sparktool.py 可以解决问题

def spark_cython(*args,**kwargs):
  global cython_function_
  module='clinical_kernel'
  method='mydist'
  try:
      return cython_function_(*args, **kwargs)
  except:
      import pyximport
      pyximport.install()
      cython_function_ = getattr(__import__(module), method)
      return cython_function_(*args, **kwargs)

关于apache-spark - 酸洗错误-Cython 与 Pyspark : scikit-learn knn with user defined metric for large dataset,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49764147/

相关文章:

python - PySpark - 如何删除 csv 输出中的科学记数法

scala - Spark 对 RDD 中按值排序

python-3.x - 从 Spark RDD 中保存的数据中清除无效字符

python - 使用 KernelExplainer(SHAP 工具)进行 Pipeline 和多类分类

python - 将 Spark 数据帧转换为以一列为键的多个列表

python - Pyspark:如何过滤两列值对的列表?

hadoop - $SPARK_HOME 中包含的 hive-site.xml 是什么样的?

scala - 错误 : value is not a member of object using Scala on the shell

python - sklearn 管道中的持久标签编码

python - 特征选择时出错