java - 是否可以使用 Kafka Streams 访问消息头?

标签 java apache-kafka apache-kafka-streams

添加Headers对于Kafka 0.11中的记录( ProducerRecord & ConsumerRecord ),在使用Kafka Streams处理主题时是否可以获取这些 header ?当在 KStream 上调用 map 等方法时,它提供记录的 keyvalue 参数,但不提供我可以看到访问标题的方式。如果我们能够在 ConsumerRecord 上进行映射,那就太好了。

例如。

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...

最佳答案

自版本 2.0.0 起即可访问记录 header (有关详细信息,请参阅 KIP-244)。

您可以通过处理器 API 访问记录元数据(即通过 transform()transformValues()process()) ,通过给定的“上下文”对象(参见 https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context )。

更新

从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478 ),添加了一个新的类型安全的 api.Processor 类以及 process(Record) 而不是 process(K, V) 方法。对于这种情况,可以通过 Record 类访问 header (和记录元数据)。

此新功能在“DSL 的 PAPI 方法”中尚不可用(例如 KStream#process()KStream#transform() 和 sibling )。

+++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳,但不公开在旧版本中读取时实际上由 Streams 删除的 header 。

元数据在 DSL 级别不可用。然而,通过 KIP-159 扩展 DSL 的工作也在进行中。 .

关于java - 是否可以使用 Kafka Streams 访问消息头?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60722488/

相关文章:

java - 解释java数组中的数据

hadoop - Kafka HDFS 2接收器连接器无法在HDFS上写入

scala - KTable 应该发出的事件

java - 跨 Kafka 分区对消息进行排序并将其放入另一个 Kafka 主题中

java - 在Android Studio中运行我的应用程序时出现DuplicatedError

java - 将来自不同社交网络的 friend 列表合并到一个列表中的好算法?

apache-kafka - 如何使用 spring-kafka 为监听器传递多个引导服务器

apache-kafka - 卡夫卡流 : use the same `application.id` to consume from multiple topics

apache-kafka - Apache 卡夫卡 : Check existence of message in a Topic

java - Hibernate - 在实体集合中设置 null 会在事务提交时自动保留