我在 GenericRecord 中有一些值,我需要通过 Kafka 流计算这些不同的值。它必须是这样的 {"Distinct values count":123}。请帮我。我是 Kafka Streams 的新手
最佳答案
你可以这样做:
SteamsBuilder topology = new StreamsBuilder();
KTable<Integer, HashMap<String, Long>> aggregate = topology.stream("input")
.groupBy((k, v) -> 0 /*map all records to same, arbitrary key*/)
.aggregate(() -> new HashMap<String, Long>(),
(k, v, a) -> {
Long count = a.get(v.get("state"));
if (count == null) {
count = 0L;
}
a.put(v.get("state"), ++count);
return a;
});
(我省略了
Serdes
在这种情况下( Consumed
在 stream()
中, Serialized
在 groupBy()
中, Materialized
在 aggregate()
中),你需要自己提供。由此产生的
KTable
将是一个不同状态值的表,它们各自的计数。
关于apache-kafka - Kafka Streams 计算不同的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48500452/