我有一个 Kafka 服务器,每 n
分钟生成如下数据:
[('a', 123), ('b', 87), ('c', 101)]
我希望我的 Spark 应用程序保留表单的计数器
计数器 = {'a': 1, 'b': 0, 'c': 1}
如果成对,例如,字典(或任何其他适当的数据结构)值会递增。 ('a', Score)
,例如分数 > 100。
下次我使用来自 Kafka 的数据时,如果 (a, Score)
对的分数再次 > 100,我希望 counter['a']
为添加一个单位,使其等于2
。
最佳答案
您可以使用collections.Counter
:
data = [('a', 123), ('b', 87), ('c', 101)]
from collections import Counter
mycounter = Counter() # initiate the Counter
然后使用update
方法来增加计数:
# update the counter with your transformed data
mycounter.update({k: 1 if v > 100 else 0 for k, v in data})
mycounter
# Counter({'a': 1, 'b': 0, 'c': 1})
第二次更新:
mycounter.update({k: 1 if v > 100 else 0 for k, v in data})
mycounter
# Counter({'a': 2, 'b': 0, 'c': 2})
关于apache-spark - Spark 应用程序中的持久计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45987846/