java - 使用kafka流根据消息 key 向主题发送消息

标签 java apache-kafka apache-kafka-streams

我希望能够根据消息键的键将 Kafkastream 中的所有记录发送到不同的主题。 前任。 Kafka 中的流包含名称作为键和记录作为值。我想根据记录的键将这些记录扇出到不同的主题

数据:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}), 预计

  • topic1 :name: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})
  • topic2 :sean-> (sean -> {seansRecord})
  • topic3 :mary -> (mary -> {marysRecord})

下面是我现在执行此操作的方式,但是由于名称列表很慢,所以速度很慢。另外即使记录了几个名字,我也需要遍历整个列表请建议修复

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 

最佳答案

我认为你应该使用 KStream::to(final TopicNameExtractor<K, V> topicExtractor)功能。它使您能够计算每条消息的主题名称。

示例代码:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);

关于java - 使用kafka流根据消息 key 向主题发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57979679/

相关文章:

Java从字符串中提取部分的最佳方法

apache-kafka - Kafka主题中如何写KTable?就像我们在 KStreams 中使用 "to()"如何为 KTable 那样做?

apache-spark - Hadoop 数据摄取

apache-kafka - 使用 Kafka 的日志压缩,如果消费者没有落后于某个定义的时间限制,我该怎么做才能保证消费者不会错过消息?

apache-kafka-streams - Transform() 后的 KStream/KTable leftjoin 导致 : StreamsException: A serializer is not compatible to the actual key or value type

java - Apache Camel XML 到 JSON

java - JFrame 最大化时 ScrollBar 不会调整大小

java - 使用 Maven 创建 EAR 文件

java - Scala - 如何过滤 KStream(Kafka Streams)

apache-kafka - Kafka Streams - 如何更好地控制内部创建的状态存储主题的分区?