python - PySpark 在 RDD 上运行多个函数

标签 python hadoop apache-spark

你好,我有示例代码:

for column in position:
    myData = dataSplit.map(lambda arr: (arr[column]))\
        .map(lambda line: line.split(','))\
        .map(lambda fields: ("Column", fields[0]))\
        .map(lambda (column, value) : value)\
        .filter(lambda line : filterWithAccum(line))\
        .map(lambda (value) : float(value))\
        .persist(StorageLevel.MEMORY_AND_DISK)
    results.append(myData.sum())
    results.append(myData.stats())
    results.append(myData.variance())
    results.append(myData.sampleStdev())
    results.append(myData.sampleVariance())

有没有一种方法可以在一个 passultiple 函数中运行,而不是每个 1 个函数运行 5 次 pass?坚持节省了很多时间,但我觉得必须有更好的方法来浓缩这些。我最初有 .min() .max() .mean() 但是 .stats() 为你做了那些,所以已经浓缩了一些。

最佳答案

我不明白你的问题,但是 .stats() 方法返回的 StatCounter 对象已经有 sum,方差sampleStddevsampleVariance 字段。所以你可以这样做

statCounter = myData.stats()
results.append(statCounter.sum())
results.append(statCounter.min())
results.append(statCounter.variance())
results.append(statCounter.sampleStdev())
results.append(statCounter.sampleVariance())

关于python - PySpark 在 RDD 上运行多个函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28967759/

相关文章:

python - 导入无显示的 Matplotlib

hadoop - Flume - 如果客户端批量大小大于 channel 事务容量会发生什么?

hadoop - 是否可以在 Apache Flume 中加入很多文件?

java - Spark 蓄能器

scala - Scala 中的 =!= 运算符是什么?

apache-spark - Spark中的错误 "unresolved operator InsertIntoStatement LogicalRDD"是什么意思?

python - 如何将 python 列表与共享项合并到新列表中

python - Autopep8 不会打破长注释行?

Python 告诉我应该在我放的地方缩进

hadoop - 从本地主机调用,因连接异常而失败