我希望能够根据消息键的键将 Kafkastream 中的所有记录发送到不同的主题。 前任。 Kafka 中的流包含名称作为键和记录作为值。我想根据记录的键将这些记录扇出到不同的主题
数据:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}), 预计
- topic1 :name: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})
- topic2 :sean-> (sean -> {seansRecord})
- topic3 :mary -> (mary -> {marysRecord})
下面是我现在执行此操作的方式,但是由于名称列表很慢,所以速度很慢。另外即使记录了几个名字,我也需要遍历整个列表请建议修复
for( String name : names )
{
recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
}
最佳答案
我认为你应该使用 KStream::to(final TopicNameExtractor<K, V> topicExtractor)
功能。它使您能够计算每条消息的主题名称。
示例代码:
final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);
关于java - 使用kafka流根据消息 key 向主题发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57979679/