java - 如何根据 Kafka Stream 的 JSON 内容过滤事件

标签 java json apache-kafka apache-kafka-streams

我正在使用 Kafka Streams 读取集群中的主题,并且我想根据其 JSON 内容过滤消息,即:

JSON 格式:

{
   "id": 1 
   "timestamp": "2019-04-21 12:53:18", 
   "priority": "Medium", 
   "name": "Sample Text",
   "metadata": [{
      "metric_1": "0", 
      "metric_2": "1", 
      "metric_3": "2"
   }]
}

我想从输入主题(让我们称之为“输入主题”)读取消息,过滤它们(假设我只想要优先级为“低”的消息),然后聚合这些消息,并将它们发送到另一个主题(“过滤主题”)

除了创建流本身及其配置之外,我没有那么多代码。我想肯定有一些关于 Serdes 的东西需要配置,但我不确定如何配置。我也尝试过使用 JSON 反序列化器,但无法让它工作。

首先,这可能吗?如果是这样,正确的做法是什么?

最佳答案

您可以从主题构建流。

    StreamsBuilder builder = new StreamsBuilder();

    // key value type here is both String for me and update based on cases
    KStream<String, String> source = builder.stream("input-topic");

    source.filter(new Predicate<String, String>() {
        @Override
        public boolean test(String s, String s2) {
            // your filter logic here and s and s2 are key/value from topic
            // In your case, s2 should be type of your json Java object
            return false;
        }
    }).groupBy(new KeyValueMapper<String, String, String>() {
        @Override
        public String apply(String key, String value) {
            // your group by logic
            return null;
        }
    }).count().toStream().to("new topic");

关于java - 如何根据 Kafka Stream 的 JSON 内容过滤事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55780331/

相关文章:

java - 为kafka java应用程序配置log4j

java - Druid 经纪人遇到 UnresolvedAddressException

java - 将 std::unique_ptr 传递给 JNI

java - 使用jsoup将html转换为纯文本时如何保留换行符?

javascript - JAX-RS JSON 对象到 JavaScript

php - 无法在 PHP 中将数据库获取到对象内的数组

javascript - 如何在React js api中以小数形式传递对象值

apache-kafka - 使用与 Kafka 主题消息键相同的 ROWKEY 创建 KSQL 表

java - 按降序对 jsonarray 数据进行排序

Java Scanner 类读取字符串