python - 用 Cython 激发 Spark

标签 python pyspark cython

我最近想将 Cython 与 Spark 一起使用,为此我关注了 the following reference .

我写了下面提到的程序,但我得到了:

TypeError:
fib_mapper_cython() takes exactly 1 argument (0 given)

Spark 工具.py

def spark_cython(module, method):
    def wrapped(*args, **kwargs):
        global cython_function_
        try:
            return cython_function_(*args, **kwargs)
        except:
            import pyximport
            pyximport.install()
            cython_function_ = getattr(__import__(module), method)
        return cython_function_(*args, **kwargs)
    return wrapped()

fib.pyx

def fib_mapper_cython(n):
    '''
     Return the first fibonnaci number > n.
    '''
    cdef int a = 0
    cdef int b = 0
    cdef int j = int(n)
    while b<j:
        a, b  = b, a+b
    return b, 1

主.py

from spark_tools import spark_cython
import pyximport
import os
from pyspark import SparkContext
from pyspark import SparkConf
pyximport.install()


os.environ["SPARK_HOME"] = "/home/spark-1.6.0"
conf = (SparkConf().setMaster('local').setAppName('Fibo'))

sc = SparkContext()
sc.addPyFile('file:///home/Cythonize/fib.pyx')
sc.addPyFile('file:///home/Cythonize/spark_tools.py')
lines = sc.textFile('file:///home/Cythonize/nums.txt')

mapper = spark_cython('fib', 'fib_mapper_cython')
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency

每当我运行程序时,我都会得到一个TypeError。有什么想法吗?

最佳答案

这不是 Cython 也不是 PySpark 问题,不幸的是,您在 spark_cython 的定义过程中添加了一个额外的函数调用。具体来说,包装调用 cython_function 的函数在返回时不带参数调用:

return wrapped()  # call made, no args supplied.

因此,当您执行此调用时,您不会返回包装的函数。您所做的是在没有 *args**kwargs 的情况下调用 wrappedwrapped 然后调用 fib_mapper_cython 没有参数(因为 *args, **kwargs 没有提供)因此 TypeError .

你应该:

return wrapped

并且此问题不应再存在。

关于python - 用 Cython 激发 Spark ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37965248/

相关文章:

apache-spark - pyspark 加入多个条件

Python 字典与 C++ 标准 :unordered_map (cython) vs cythonized python dict

python - 为什么比特币椭圆曲线上的这个特殊点看起来不属于那个椭圆曲线?

Python 多处理 map_async 挂起

Python CMD 语法错误

sql-server - PySpark 中通过 JDBC 的 SQL Server

python - 通过删除循环来优化 Python 代码

pyspark - Spark 中覆盖的行为

python - Cython 函数中的字符串

python - 安装 Python 库后自动创建奇怪的文件