java - Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

标签 java apache-kafka apache-kafka-streams

我如何在 Kafka Streams API 中使用具有多个约束的 .groupby。与下面的 Java 8 Streams API 示例相同

public void twoLevelGrouping(List<Person> persons) {
     final Map<String, Map<String, List<Person>>> personsByCountryAndCity = persons.stream().collect(
         groupingBy(Person::getCountry,
            groupingBy(Person::getCity)
        )
    );
    System.out.println("Persons living in London: " + personsByCountryAndCity.get("UK").get("London").size());
}

最佳答案

您可以通过将要分组的所有属性/字段放入键中来指定组合键。

KTable table = stream.selectKey((k, v,) -> k::getCountry + "-" + k::getCity)
                     .groupByKey()
                     .aggregate(...); // or maybe .reduce()

我假设国家和城市都是String。您使用交互式查询来查询商店

store.get("UK-London");

https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html

关于java - Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48927093/

相关文章:

Java Swing : Add a component by code in NetBeans

docker - Kafka 消费者组偏移量下降到 -1

java - 基于选择的更改事件的不同表单操作

java - 如何将对象从获取 Controller 提交到后置 Controller ?

mysql - 卡夫卡连接-jdbc : SQLException: No suitable driver only when using distributed mode

apache-kafka - 如何从头开始使用Kafka Consumer API读取数据?

java - 通过与 kafka-streams 的连接批量处理数据导致 `Skipping record for expired segment`

apache-kafka - 测试 KafkaStreams 应用程序

docker - Kafka流窗口聚合几乎可以正常工作

java - 使用Auditing时如何自定义Spring-Data注入(inject)的AuditingHandler?