python - 包装 pyspark Pipeline.__init__ 和装饰器

标签 python apache-spark pyspark

我正在尝试包装 pyspark Pipeline.init 构造函数的构造函数,并在新包装的构造函数中包装猴子补丁。但是,我遇到了一个似乎与 Pipeline.init 使用装饰器的方式有关的错误

这是实际执行猴子补丁的代码:

    def monkeyPatchPipeline():
      oldInit = Pipeline.__init__

      def newInit(self, **keywordArgs):
        oldInit(self, stages=keywordArgs["stages"])

      Pipeline.__init__ = newInit

但是,当我运行一个简单的程序时:

import PythonSparkCombinatorLibrary
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

PythonSparkCombinatorLibrary.TransformWrapper.monkeyPatchPipeline()
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

我收到这个错误:

Traceback (most recent call last):
  File "C:\<my path>\PythonApplication1\main.py", line 26, in <module>
   pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
  File "C:<my path>PythonApplication1 \PythonSparkCombinatorLibrary.py", line 36, in newInit
oldInit(self, stages=keywordArgs["stages"])
  File "C:\<pyspark_path>\pyspark\__init__.py", line 98, in wrapper
   return func(*args, **kwargs)
 File "C:\<pyspark_path>\pyspark\ml\pipeline.py", line 63, in __init__
    kwargs = self.__init__._input_kwargs
AttributeError: 'function' object has no attribute '_input_kwargs'

查看 pyspark 界面,我看到 Pipeline.init 看起来像这样:

@keyword_only
def __init__(self, stages=None):
    """
    __init__(self, stages=None)
    """
    if stages is None:
        stages = []
    super(Pipeline, self).__init__()
    kwargs = self.__init__._input_kwargs
    self.setParams(**kwargs)

注意到 @keyword_only 装饰器,我也检查了该代码:

def keyword_only(func):
    """
    A decorator that forces keyword arguments in the wrapped method
    and saves actual input keyword arguments in `_input_kwargs`.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        if len(args) > 1:
            raise TypeError("Method %s forces keyword arguments." % func.__name__)
        wrapper._input_kwargs = kwargs
        return func(*args, **kwargs)
    return wrapper

我完全不明白这段代码最初是如何工作的,也不知道为什么它似乎会导致我自己的包装器出现问题。我看到包装器正在向自身添加一个 _input_kwargs 字段,但是 Pipeline.__init__ 将如何使用 self.__init__._input_kwargs 读取该字段?为什么当我再次包装 Pipeline.__init__ 时不会发生同样的事情?

最佳答案

Decorator 101. Decorator 是一个高阶函数,它将一个函数作为它的第一个参数(通常只有一个),并返回一个函数。 @注解只是一个简单函数调用的语法糖,所以下面

@decorator
def decorated(x):
    ...

例如可以重写为:

def decorated_(x):
    ...

decorated  = decorator(decorated_)

所以 Pipeline.__init__ 实际上是一个 functools.wrapped wrapper 捕获定义的 __init__(keyword_onlyfunc 参数)作为其闭包的一部分。当它被调用时,它使用接收到的 kwargs 作为 function attribute本身。基本上这里发生的事情可以简化为:

def f(**kwargs):
    f._input_kwargs = kwargs  # f is in the current scope

hasattr(f, "_input_kwargs")
False
f(foo=1, bar="x")

hasattr(f, "_input_kwargs")
True

当您进一步包装(装饰)__init__ 时,外部函数将不会附加 _input_kwargs,因此会出现错误。如果你想让它工作,你可以将与原始 __init__ 使用的相同的过程应用到你自己的版本,例如使用相同的装饰器:

@keyword_only
def newInit(self, **keywordArgs):
    oldInit(self, stages=keywordArgs["stages"])

但我喜欢我在评论中提到的,你应该考虑子类化。

关于python - 包装 pyspark Pipeline.__init__ 和装饰器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41833296/

相关文章:

java - 在spark中使用stanford nlp,错误 "Class java.util.function.Function not found - continuing with a stub."

scala - 由于 GC 开销限制,简单的 Spark 作业失败

pyspark - findspark.init() IndexError : list index out of range: PySpark on Google Colab

python - nltk 中的 Text.concordance() 是否可作为分布式方法用于 pyspark

python - 无法使用spyder anaconda导入tensorflow

python - Django 和 VirtualEnv 开发/部署最佳实践

java - Spark Streaming/Spark 在 main() 方法中是否像 while 循环一样工作?

python - 将列中的字符串作为嵌套 JSON 存储到 JSON 文件 - Pyspark

python - 使用理解反向采样嵌套列表

python - 通过 curl POST 将 csv 文件发送到 Django REST Framework 端点