python - 在 Spark Streaming 中查找中位数

标签 python numpy apache-spark spark-streaming

我正在尝试编写最简单的代码示例:

from numpy import median
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 30)

qs = ssc.queueStream([[1,2,3],[4,5],[6,7,8,9,9]])
output = qs.foreachRDD(median)

output.pprint()

ssc.start(); ssc.awaitTermination()

我想为流中的每个 rdd 生成中位数。我的直播每 30 秒一次。 为了测试我的代码,我创建了一个queueStream

当我查看输出类型时,我得到以下结果:

 type(output)
<type 'NoneType'>

为什么会出现这样的情况呢?当我尝试使用 map 将中值应用于我的流时,它一次将中值函数应用于列表中的每个成员。我想将中值函数作为一个聚合应用于整个 RDD,因此 map 函数是不可能的。

如何计算 Spark Streaming 中流的中位数?

最佳答案

扩展@Justin 的答案:发生了什么:

median()

分别应用于每个 DStream。然而结果没有被任何人使用..为什么? foreachRdd() 是一个操作,而不是一个转换。

您应该查看 DStream 转换:例如map():这是尚未 100% 调试的代码 - 但它提供了一个结构:

from pyspark.streaming import *
ssc = StreamingContext(sc, 30)
dataRdd = [sc.parallelize(d, 1) for d in [[1,2,3],[4,5],[6,7,8,9,9]]]
qs = ssc.queueStream(dataRdd)

def list_median((med,mylist),newval):
    mylist = [newval] if not mylist else mylist.append(newval)
    mylist = sorted(mylist)
    return (mylist[int(len(mylist)/2)], mylist)

medians = qs.reduce(list_median).map(lambda (med,list): med)
def printRec(rdd):
    import sys
    rdd.foreach(lambda rec: sys.stderr.write(repr(rec)))

medians.foreachRDD(printRec)
ssc.start(); ssc.awaitTermination()

关于python - 在 Spark Streaming 中查找中位数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29112610/

相关文章:

scala - Spark 流示例使用附加参数调用 updateStateByKey

if-statement - IF声明Pyspark

python - numpy.median 在屏蔽数组上的意外行为

python - 比较列表列表中的 a 并使用 python 添加不同值的最佳方法

python 列表回溯的排列

python - 查找单词的所有出现+子串

python - 从小对数概率向量中以numpy/scipy采样多项式

Python base64编码然后解码通用对象

python - PySpark XML 到带时间序列数据的 JSON

python - 如何使用 python 2.7 关闭 cmd 窗口