我们知道在kafka stream doc, peek, filter, branch是无状态操作吗? 但是,我想在这个处理器中做一些有状态的操作吗? 例如,我想做一些查询,并根据结果过滤消息,我可以这样做吗?
最佳答案
peek()
、filter()
和 branch()
操作本质上是无状态的。当你说:
I wanna do some query, and filter messages base the results
这取决于你想查询什么?可以(但不推荐)查询“外部”API。但是,它没有内置支持,并且需要考虑许多极端情况以使其健壮。请注意,查询外部系统不会使操作有状态。
如果你想使用状态,你可以使用 transform()
(和 sibling )并构建自定义运算符。如果您命名所有下游运算符(通过 Named
和类似方式),您可以使用 context.forward(..., To.child(...))
来实现自定义分支。对于过滤,您可以返回 null
以不转发任何内容。
不确定有状态 peek() 的用途,但您也可以这样做。
根据用例,还可以通过流表连接或流全局表连接实现“状态过滤器”。
关于filter - 我可以在 kafka 流应用程序的 peek 或过滤器或分支中执行一些状态操作吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61099753/