java - Kafka - 如何同时使用 filter 和 filternot?

标签 java apache-kafka apache-kafka-streams

我有一个从主题获取数据的 Kafka 流,需要将该信息过滤到两个不同的主题。

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");

但是,当我这样做时,它会从主题中读取数据两次——不确定随着数据变大这是否会对性能产生任何影响。有没有办法只过滤一次并推送到两个主题?

最佳答案

您的方法是正确的,数据未从主题中读取两次,并且没有内部数据复制正在进行。您的方法的唯一缺点是,两个过滤器谓词都会针对每条记录进行评估——但是,这是非常便宜的,不应该成为性能问题。

但是,您仍然可以通过使用 KStream#branch() 来提高性能,它确实采用多个谓词并依次评估所有谓词并为每个谓词返回一个输入流。如果记录与谓词匹配,则将其放入相应的输出流中并停止评估(即,不再为该单个记录评估进一步的谓词——这确保每个记录被添加到最大一个输出流;或者如果没有谓词匹配)。

因此,您可以只向 branch() 提供两个谓词:第一个与原始 filter() 谓词相同,第二个谓词始终返回

KStream<String, Model> stream = builder.stream(
    Serdes.String(),
    specificAvroSerde,
    "not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
    (key, value) -> new Processor().test(key,value),
    (key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");

但不确定此代码是否比您的原始版本更易读。我想这是一个品味问题,我个人更喜欢你的原始代码,因为它确实更好地表达了语义。

我添加的版本应该稍微提高 CPU 效率,因为对于所有满足谓词的记录,它只被评估一次。对于所有不满足结果的记录,将返回一个简单的 true(即,没有第二个谓词评估)。

如果您知道大多数记录将在 splitStream[1] 中结束,您还可以反转谓词(并使用 splitStream[0] 作为“bad-stream ") 以减少对第二个 true 返回谓词的调用次数。但这些只是微优化,应该无关紧要。

关于java - Kafka - 如何同时使用 filter 和 filternot?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40918158/

相关文章:

apache-kafka-streams - Materialized<> 类型中的 withValueSerde() 方法不适用

apache-kafka - 即使没有消费者,消费者组仍停留在 'rebalancing'

java - 按代码点读取文本流代码点

java - 使异步代码阻塞

apache-kafka - 卡夫卡消费者标记协调员2147483647死亡

apache-kafka - 在 StreamsConfig 中找不到 KEY_SERIALIZER_CLASS_CONFIG

java - Kafka 流创建一个简单的物化 View

Java 类型-细化映射

javascript - Ajax 参数在传递到 Java Controller 时会被加密

java - 如果消息是由生产者产生的,如何从卡夫卡经纪人那里得到确认?