apache-kafka - Kafka Streams 计算不同的值

标签 apache-kafka kafka-producer-api apache-kafka-streams

我在 GenericRecord 中有一些值,我需要通过 Kafka 流计算这些不同的值。它必须是这样的 {"Distinct values count":123}。请帮我。我是 Kafka Streams 的新手

最佳答案

你可以这样做:

SteamsBuilder topology = new StreamsBuilder();

KTable<Integer, HashMap<String, Long>> aggregate = topology.stream("input")
  .groupBy((k, v) -> 0 /*map all records to same, arbitrary key*/)
  .aggregate(() -> new HashMap<String, Long>(),
             (k, v, a) -> {
                Long count = a.get(v.get("state"));
                if (count == null) {
                    count = 0L;
                }
                a.put(v.get("state"), ++count);
                return a;
             });

(我省略了 Serdes 在这种情况下( Consumedstream() 中, SerializedgroupBy() 中, Materializedaggregate() 中),你需要自己提供。

由此产生的 KTable将是一个不同状态值的表,它们各自的计数。

关于apache-kafka - Kafka Streams 计算不同的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48500452/

相关文章:

java - Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

apache-kafka-streams - Spring Cloud Stream 中是否可以有多个@StreamListener?

java - 有没有办法通过 API 重置 Kafka 消费者组的偏移量?

apache-kafka - Apache Kafka 中的分区数量是否有上限

apache-kafka - 如何使用 Kafka 镜像制造商复制模式?

apache-kafka - Python librdkafka Producer 与 native Apache Kafka Producer 执行

python - 如何使用 Python 在 Kafka 中生成墓碑 Avro 记录?

apache-kafka - Kafka 事件携带的状态传输系统是否应该使用 GlobalKTable 进行本地查询来实现?

apache-kafka - KafkaStreams - InconsistentGroupProtocolException

java - 如何将流式 json 数据作为键值对发送到 kafka 消费者中