apache-spark - Spark 应用程序中的持久计数器

标签 apache-spark pyspark apache-spark-sql

我有一个 Kafka 服务器,每 n 分钟生成如下数据:

[('a', 123), ('b', 87), ('c', 101)]

我希望我的 Spark 应用程序保留表单的计数器

计数器 = {'a': 1, 'b': 0, 'c': 1}

如果成对,例如,字典(或任何其他适当的数据结构)值会递增。 ('a', Score),例如分数 > 100。

下次我使用来自 Kafka 的数据时,如果 (a, Score) 对的分数再次 > 100,我希望 counter['a'] 为添加一个单位,使其等于2

最佳答案

您可以使用collections.Counter:

data = [('a', 123), ('b', 87), ('c', 101)]

from collections import Counter    ​
mycounter = Counter()         # initiate the Counter

然后使用update方法来增加计数:

# update the counter with your transformed data
mycounter.update({k: 1 if v > 100 else 0 for k, v in data})    
mycounter
# Counter({'a': 1, 'b': 0, 'c': 1})

第二次更新:

mycounter.update({k: 1 if v > 100 else 0 for k, v in data})
mycounter
# Counter({'a': 2, 'b': 0, 'c': 2})

关于apache-spark - Spark 应用程序中的持久计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45987846/

相关文章:

scala - 了解 Kryo 序列化缓冲区溢出错误

apache-spark - Spark - 日期与时间戳比较 - 无意义的结果 `2018-01-01` 小于 `2018-01-01 00:00:00`

python - 使用 log4j conversionpattern 进行 Pyspark 日志记录不起作用

scala - 在 Spark SQL 中将数组作为 UDF 参数传递

python - Spark 数据框更新列,其中其他列与 PySpark 类似

scala - 如何获得两个DataFrame之间的对称差异?

python - 在 Spark 数据框中生成可重复的唯一 ID

scala - 如何将 show 运算符的输出读回数据集?

python - 将 Pyspark RDD 拆分为不同的列并转换为 Dataframe

java - Databricks Spark 笔记本在运行之间重复使用 Scala 对象?