我正在使用 Kafka Streams 读取集群中的主题,并且我想根据其 JSON 内容过滤消息,即:
JSON 格式:
{
"id": 1
"timestamp": "2019-04-21 12:53:18",
"priority": "Medium",
"name": "Sample Text",
"metadata": [{
"metric_1": "0",
"metric_2": "1",
"metric_3": "2"
}]
}
我想从输入主题(让我们称之为“输入主题”)读取消息,过滤它们(假设我只想要优先级为“低”的消息),然后聚合这些消息,并将它们发送到另一个主题(“过滤主题”)
除了创建流本身及其配置之外,我没有那么多代码。我想肯定有一些关于 Serdes 的东西需要配置,但我不确定如何配置。我也尝试过使用 JSON 反序列化器,但无法让它工作。
首先,这可能吗?如果是这样,正确的做法是什么?
最佳答案
您可以从主题构建流。
StreamsBuilder builder = new StreamsBuilder();
// key value type here is both String for me and update based on cases
KStream<String, String> source = builder.stream("input-topic");
source.filter(new Predicate<String, String>() {
@Override
public boolean test(String s, String s2) {
// your filter logic here and s and s2 are key/value from topic
// In your case, s2 should be type of your json Java object
return false;
}
}).groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
// your group by logic
return null;
}
}).count().toStream().to("new topic");
关于java - 如何根据 Kafka Stream 的 JSON 内容过滤事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55780331/