python - 无法更新 Pyspark 中的变量

标签 python scope pyspark

我正在尝试更新 pyspark 中的变量,并希望在另一种方法中使用相同的变量。我在类里面使用 @property ,当我在 python 中测试它时它按预期工作但是当我试图在 pyspark 中实现它时它没有更新变量。请帮我找出我做错了什么。

代码:

class Hrk(object):
    def __init__(self, hrkval):
        self.hrkval = hrkval

    @property
    def hrkval(self):
        return self._hrkval

    @hrkval.setter
    def hrkval(self, value):
        self._hrkval = value

    @hrkval.deleter
    def hrkval(self):
        del self._hrkval


filenme = sc.wholeTextFiles("/user/root/CCDs")

hrk = Hrk("No Value")

def add_demo(filename):
   pfname[]
   plname[]
   PDOB[]
   gender[]
   .......i have not mentioned my logic, i skipped that part......
   hrk.hrkval = pfname[0]+"@#"+plname[0]+PDOB[0]+gender[0]
   return (str(hrk.hrkval))


def add_med(filename):
   return (str(hrk.hrkval))


filenme.map(getname).map(add_demo).saveAsTextFile("/user/cloudera/Demo/")
filenme.map(getname).map(add_med).saveAsTextFile("/user/cloudera/Med/")  

在我的第一个方法调用 (add_demo) 中,我得到了正确的值,但是当我想在第二个方法中使用相同的变量时,我得到了 No Value 。我不知道为什么它不更新变量。类似的逻辑在 python 中运行良好。

最佳答案

您正在尝试使用 map 改变全局变量的状态应用程序接口(interface)。这不是 Spark 的推荐模式。你尽量使用纯函数,使用operations喜欢.reduce.reduceByKey.foldLeft .以下简化示例不起作用的原因是因为当 .map被调用时,spark 为函数 f1 创建了一个闭包, 创建 hrk 的副本每个“分区”的对象并将其应用于每个分区中的行。

import pyspark
import pyspark.sql

number_cores = 2
memory_gb = 1
conf = (
    pyspark.SparkConf()
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)

c = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SQLContext(sc)

class Hrk(object):
    def __init__(self, hrkval):
        self.hrkval = hrkval

    @property
    def hrkval(self):
        return self._hrkval

    @hrkval.setter
    def hrkval(self, value):
        self._hrkval = value

    @hrkval.deleter
    def hrkval(self):
        del self._hrkval

hrk = Hrk("No Value")
print(hrk.hrkval)
# No Value

def f1(x):
    hrk.hrkval = str(x)
    return "str:"+str(hrk.hrkval)


data = sc.parallelize([1,2,3])
data.map(f1).collect()
# ['str:1', 'str:2', 'str:3']

print(hrk.hrkval)
# No Value

您可以在 Understanding Closures 中阅读有关闭包的更多信息官方 spark 文档的 rdd 编程指南部分,这里有一些重要的片段:

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-

关于python - 无法更新 Pyspark 中的变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46072798/

相关文章:

C 逗号运算符

Python时区转换

python - 如何在 asyncio 中安排任务以使其在特定日期运行?

python - 有条件替换 Pandas

javascript - 将 JavaScript 变量添加到本地范围

c++ - 如何在函数中分配指向新对象的指针,而该对象在编辑后不会消失

python - Numpy中一维数组的滚动窗口?

java - PySpark:无法创建 SparkSession。(Java 网关错误)

python - 根据条件将列添加到 pyspark 数据框

apache-spark - 使用 group by 时在 Spark 中保留未使用的列?