java - Apache Kafka - 实现 KTable

标签 java apache-kafka apache-kafka-streams ktable

我是 Kafka Streams API 的新手,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic,它是一个json格式的消息,如下所示。

{ "current_ts": "2019-12-24 13:16:40.316952",
  "primary_keys": ["ID"],
  "before": null,
  "tokens": {"txid":"3.17.2493", 
             "csn":"64913009"},
  "op_type":"I",
  "after":  { "CODE":"AAAA41",
              "STATUS":"COMPLETED",
              "ID":24},
  "op_ts":"2019-12-24 13:16:40.316941",
  "table":"S_ORDER"} 

我阅读了此主题的消息,并且想要创建一个 KTable,其中 key 为字段 "after":"ID" 对于“after” 字段内的所有字段(“ID” 除外)。

仅当我使用默认聚合函数(即计数)时,我才成功创建了 KTable。但我很难创建自己的聚合函数。下面我展示了我尝试创建 KTable 的部分代码。

KTable<Long, String> s_table = builder.stream("s-order-topic",  Consumed.with(Serdes.Long(),Serdes.String()))
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                })
               .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return json.getLong("ID");
                }, Grouped.with(Serdes.Long(), Serdes.String()))
                .aggregate( ... );

我如何实现这个KTable?

我是否正确地解决了这个问题?

(mapValues -> 仅保留“before”/“after”字段。groupBy -> 将 ID 作为消息的键。聚合 -> ? )

最佳答案

我为我的案例找到了解决方案。我实现的KTable如下所示:

 KTable<String, String> s_table = builder.stream("s-order-topic",  Consumed.with(Serdes.String(),Serdes.String()))
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                })
                .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return String.valueOf(json.getLong("ID"));
                }, Grouped.with(Serdes.String(), Serdes.String()))
                .reduce((prev,newval)->newval);

aggregate 函数不适合这种情况,我使用了 reduce 函数。

控制台消费者的输出如下所示:

15   {"CODE":"AAAA17","STATUS":"PENDING","ID":15}
18   {"CODE":"AAAA50","STATUS":"SUBMITTED","ID":18}
4    {"CODE":"AAAA80","STATUS":"SUBMITTED","ID":4}
19   {"CODE":"AAAA83","STATUS":"SUBMITTED","ID":19}
18   {"CODE":"AAAA33","STATUS":"COMPLETED","ID":18}
5    {"CODE":"AAAA38","STATUS":"PENDING","ID":5}
10   {"CODE":"AAAA1","STATUS":"COMPLETED","ID":10}
3    {"CODE":"AAAA68","STATUS":"NOT COMPLETED","ID":3}
9    {"CODE":"AAAA89","STATUS":"PENDING","ID":9}

关于java - Apache Kafka - 实现 KTable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59628222/

相关文章:

java - 我正在为 Java Stream 中的泛型而苦苦挣扎

java - 在 for 循环中继续使用标签

apache-kafka - 卡夫卡连接器中的动态主题

apache-kafka - 卡夫卡流 : "TopicAuthorizationException: Not authorized to access topics" for an internal state store

java - Kafka GroupTable 测试使用 ProcessorTopologyTestDriver 时生成额外消息

java - spring cloud aws 项目不再有活跃的所有者了吗?

java - OrientDB 的 ShortestPath 查询太慢

java - 在 REST 端点中流式传输 Kafka 消息

mysql - 卡夫卡连接-jdbc : SQLException: No suitable driver only when using distributed mode

apache-kafka-streams - Kafka 流并发行为