python - 将 lambda 函数转换为常规函数

标签 python apache-spark pyspark

我需要将以下涉及多个 lambda 函数的代码转换为单独的“常规”函数。我该怎么做?

我现在正在开发 Pyspark。数据源是一个RDD

result = rdd.filter(lambda x: x[0]=='9439').map(lambda x: (x[0], json.loads(x[1])['exposures'])).flatMapValues(lambda x: x).map(lambda x: {'serial_no' : x[0], **x[1]})

我的尝试:

def acct_tuple(x):
    return (x[0], json.loads(x[1])['exposures'])


def flat_map(x):
    return x

def get_tuple(x):
    return {'serial_no': x[0], **x[1]}


rdd = rdd.map(acct_tuple(x)).flatMapValues(flat_map(x)).map(get_tuple(x))

有更好的方法吗?

最佳答案

您应该将函数本身作为参数传递,而不是调用它,因此:

def acct_tuple(x):
    return (x[0], json.loads(x[1])['exposures'])


def flat_map(x):
    return x

def get_tuple(x):
    return {'serial_no': x[0], **x[1]}


rdd = rdd.map(acct_tuple).flatMapValues(flat_map).map(get_tuple)

关于python - 将 lambda 函数转换为常规函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58175984/

相关文章:

apache-spark - Spark Streaming 批处理之间的数据共享

python - 使用 pyspark 从字典中映射数据框中的值

python - 在 Apache Spark 中使用 pyspark 转置 Dataframe

python - 在 tqdm 中获取后缀字符串

java - 为什么SparkSQL中org.apache.spark.sql.types.DecimalType的最大精度值为38?

apache-spark - Spark 1.6 kafka 流式处理 dataproc py4j 错误

pyspark - 从 .egg 而不是 .py 运行 PySpark 作业

java - 对于 Selenium,我需要启动 java 服务器吗?

python - Sklearn 模型系数并预测 linear_model 中的不匹配

python - 如何将参数传递给py2neo中的密码查询