我在一个主题中有多个事件,我正在尝试按以下步骤进行处理:
- 根据 header 值过滤事件
- 应用解串器
- 按键分组
- 聚合以生成新的 KTable
- 新 KTable 将与具有新 header 的新事件流式传输到同一主题。
我可以使用transformValues访问 header ,但不确定如何在执行toStream时注入(inject)新的 header 值。
streamsBuilder.stream("my-topic")
.transformValues(new Transformer())//access headers here n filter few events
.groupByKey(Serialized.with(Serdes.String(),null)
.aggregate(()->my avro object initialization,(key,value,aggregate)->newValue(Value,aggregate),Materialized.as("my-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.ByteArray())
.mapValues((key,value)->convert to bytes).toStream()
注意:我是 KStream 的新手。
最佳答案
您可以使用Processor API添加自定义 header 。以与访问 header 相同的方式实现处理方法。
new Processor() {
......
@override
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "key");
}
...
}
关于java - Kafka Streams 生产者上的自定义 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57309765/