filter - 我可以在 kafka 流应用程序的 peek 或过滤器或分支中执行一些状态操作吗?

标签 filter apache-kafka apache-kafka-streams stateful

我们知道在kafka stream doc, peek, filter, branch是无状态操作吗? 但是,我想在这个处理器中做一些有状态的操作吗? 例如,我想做一些查询,并根据结果过滤消息,我可以这样做吗?

最佳答案

peek()filter()branch() 操作本质上是无状态的。当你说:

I wanna do some query, and filter messages base the results

这取决于你想查询什么?可以(但不推荐)查询“外部”API。但是,它没有内置支持,并且需要考虑许多极端情况以使其健壮。请注意,查询外部系统不会使操作有状态

如果你想使用状态,你可以使用 transform() (和 sibling )并构建自定义运算符。如果您命名所有下游运算符(通过 Named 和类似方式),您可以使用 context.forward(..., To.child(...)) 来实现自定义分支。对于过滤,您可以返回 null 以不转发任何内容。

不确定有状态 peek() 的用途,但您也可以这样做。

根据用例,还可以通过流表连接或流全局表连接实现“状态过滤器”。

关于filter - 我可以在 kafka 流应用程序的 peek 或过滤器或分支中执行一些状态操作吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61099753/

相关文章:

javascript - 过滤数字数组,其中 0 是有效输入

python - 根据日期键的值过滤字典

apache-kafka - vert.x事件总线可以代替Kafka的需要吗?

scala - Spark Structured Streaming with Kafka - 如何重新分区数据并在工作节点之间分配处理

java - 在 kStreams 中使用 lambda 连接 Avro 格式数据

apache-kafka - 具有状态存储的 Kafka 有状态流处理器 : Behind the scenes

javascript - 使用 forEach 过滤数组

maven-2 - 使用 Ant 模拟 Maven2 过滤机制

apache-kafka - 如何监控Kafka主题中的消息率​​?

apache-kafka - 当存储值是 Avro SpecificRecord 时,KafkaStreamsStateStore 不工作