我最近想将 Cython
与 Spark 一起使用,为此我关注了 the following reference .
我写了下面提到的程序,但我得到了:
TypeError:
fib_mapper_cython() takes exactly 1 argument (0 given)
Spark 工具.py
def spark_cython(module, method):
def wrapped(*args, **kwargs):
global cython_function_
try:
return cython_function_(*args, **kwargs)
except:
import pyximport
pyximport.install()
cython_function_ = getattr(__import__(module), method)
return cython_function_(*args, **kwargs)
return wrapped()
fib.pyx
def fib_mapper_cython(n):
'''
Return the first fibonnaci number > n.
'''
cdef int a = 0
cdef int b = 0
cdef int j = int(n)
while b<j:
a, b = b, a+b
return b, 1
主.py
from spark_tools import spark_cython
import pyximport
import os
from pyspark import SparkContext
from pyspark import SparkConf
pyximport.install()
os.environ["SPARK_HOME"] = "/home/spark-1.6.0"
conf = (SparkConf().setMaster('local').setAppName('Fibo'))
sc = SparkContext()
sc.addPyFile('file:///home/Cythonize/fib.pyx')
sc.addPyFile('file:///home/Cythonize/spark_tools.py')
lines = sc.textFile('file:///home/Cythonize/nums.txt')
mapper = spark_cython('fib', 'fib_mapper_cython')
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency
每当我运行程序时,我都会得到一个TypeError
。有什么想法吗?
最佳答案
这不是 Cython
也不是 PySpark
问题,不幸的是,您在 spark_cython
的定义过程中添加了一个额外的函数调用。具体来说,包装调用 cython_function
的函数在返回时不带参数调用:
return wrapped() # call made, no args supplied.
因此,当您执行此调用时,您不会返回包装的函数。您所做的是在没有 *args
或 **kwargs
的情况下调用 wrapped
。 wrapped
然后调用 fib_mapper_cython
没有参数(因为 *args, **kwargs
没有提供)因此 TypeError
.
你应该:
return wrapped
并且此问题不应再存在。
关于python - 用 Cython 激发 Spark ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37965248/