apache-spark - PySpark 计数在 RDD 中按组区分

标签 apache-spark pyspark

我有一个日期时间和主机名作为 tuple 的 RDD,我想按日期计算唯一主机名。

RDD:

X = [(datetime.datetime(1995, 8, 1, 0, 0, 1), u'in24.inetnebr.com'),
     (datetime.datetime(1995, 8, 1, 0, 0, 7), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 1, 0, 0, 8), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 2, 0, 0, 8), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 2, 0, 0, 8), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 2, 0, 0, 9), u'ix-esc-ca2-07.ix.netcom.com'),
     (datetime.datetime(1995, 8, 3, 0, 0, 10), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 3, 0, 0, 10), u'slppp6.intermind.net'),
     (datetime.datetime(1995, 8, 4, 0, 0, 10), u'piweba4y.prodigy.com'),
     (datetime.datetime(1995, 8, 5, 0, 0, 11), u'slppp6.intermind.net')]

期望的输出:

[(datetime.datetime(1995, 8, 1, 0, 0, 1), 2),
 (datetime.datetime(1995, 8, 2, 0, 0, 8), 2),
 (datetime.datetime(1995, 8, 3, 0, 0, 10), 2),
 (datetime.datetime(1995, 8, 4, 0, 0, 10), 1),
 (datetime.datetime(1995, 8, 5, 0, 0, 11), 1)]

我的尝试:

dayGroupedHosts = X.groupBy(lambda x: x[0]).distinct()
dayHostCount = dayGroupedHosts.count()

我在执行 count 操作时遇到错误。我是 Spark 的新手,我想知道正确有效的 transformation 来完成这些任务。

提前非常感谢。

最佳答案

您需要先将键转换为日期。然后按 key 分组,计算不同的值:

X.map(lambda x: (x[0].date(), x[1]))\
    .groupByKey()\
    .mapValues(lambda vals: len(set(vals)))\
    .sortByKey()\
    .collect()
#[(datetime.date(1995, 8, 1), 2),
# (datetime.date(1995, 8, 2), 2),
# (datetime.date(1995, 8, 3), 2),
# (datetime.date(1995, 8, 4), 1),
# (datetime.date(1995, 8, 5), 1)]

关于apache-spark - PySpark 计数在 RDD 中按组区分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53917221/

相关文章:

python - 如何在 PySpark 中将行值(时间序列)转置为列值?

scala - Spark - 更改数据集中属于长尾的记录的值

python - Pyspark 管道中用户定义的变压器

apache-spark - Spark - SparkSession 应该只有一个吗?

apache-spark - 重启 Spark Streaming 应用程序的最佳方法是什么?

hadoop - 在Spark中充分利用内存

python - 如何在新的 Spark session 中再次读取 Spark 表?

python - Spark (2.2) 性能 <-> Spark 持久

apache-spark - Spark 物理计划和逻辑计划

python - 使用 python Spark 结构化流