java - Kafka Streams 生产者上的自定义 header

标签 java apache-kafka apache-kafka-streams spring-cloud-stream

我在一个主题中有多个事件,我正在尝试按以下步骤进行处理:

  1. 根据 header 值过滤事件
  2. 应用解串器
  3. 按键分组
  4. 聚合以生成新的 KTable
  5. 新 KTable 将与具有新 header 的新事件流式传输到同一主题。

我可以使用transformValues访问 header ,但不确定如何在执行toStream时注入(inject)新的 header 值。

streamsBuilder.stream("my-topic")
.transformValues(new Transformer())//access headers here n filter few events
.groupByKey(Serialized.with(Serdes.String(),null)
.aggregate(()->my avro object initialization,(key,value,aggregate)->newValue(Value,aggregate),Materialized.as("my-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.ByteArray())
.mapValues((key,value)->convert to bytes).toStream()

注意:我是 KStream 的新手。

最佳答案

您可以使用Processor API添加自定义 header 。以与访问 header 相同的方式实现处理方法。

new Processor() { 
    ......
   @override
   public void process(String key, String value) {
       // add a header to the elements
       context().headers().add.("key", "key");
   }
   ...
}

关于java - Kafka Streams 生产者上的自定义 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57309765/

相关文章:

java对象流

java - 如何在 Vertx Kafka 客户端中使用自定义序列化器?

java - 使用 Kafka Streams 进行自定义转换

java - libgdx 在游戏重置时更改背景

Java-如何正确使用list.copyOf?

java - 将 JInternalFrames 从一个 JDesktopPane 移动到另一个 JDesktopPane

java - Spring-Kafka : Issue while deserialising kafka message - class not in a "trusted package"?

spring-boot - spring kafka中使用SeekToCurrentErrorHandler时如何设置重试间隔时间

java - Kafka Stream 的交互式调节

apache-kafka - 如何始终使用 kafka-streams 中的最新偏移量