添加Headers对于Kafka 0.11中的记录( ProducerRecord & ConsumerRecord ),在使用Kafka Streams处理主题时是否可以获取这些 header ?当在 KStream
上调用 map
等方法时,它提供记录的 key
和 value
参数,但不提供我可以看到访问标题
的方式。如果我们能够在 ConsumerRecord
上进行映射
,那就太好了。
例如。
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像这样的东西会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
最佳答案
自版本 2.0.0 起即可访问记录 header (有关详细信息,请参阅 KIP-244)。
您可以通过处理器 API 访问记录元数据(即通过 transform()
、transformValues()
或 process()
) ,通过给定的“上下文”对象(参见 https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context )。
更新
从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478 ),添加了一个新的类型安全的 api.Processor
类以及 process(Record)
而不是 process(K, V) 方法。对于这种情况,可以通过 Record
类访问 header (和记录元数据)。
此新功能在“DSL 的 PAPI 方法”中尚不可用(例如 KStream#process()
、KStream#transform()
和 sibling )。
+++++
在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳,但不公开在旧版本中读取时实际上由 Streams 删除的 header 。
元数据在 DSL 级别不可用。然而,通过 KIP-159 扩展 DSL 的工作也在进行中。 .
关于java - 是否可以使用 Kafka Streams 访问消息头?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60722488/