我想将 KStream 与 KTable 结合起来。两者都有不同的 key ,但使用自定义分区程序进行共同分区。但是,连接不会产生结果。
KStream具有以下结构
- 键:房屋 - 组
- 值:用户
KTable具有以下结构
- 键:用户 - 组
- 值:地址
为了确保每次插入时都按插入顺序处理两个主题,我使用自定义分区程序,使用每个键的组部分对两个主题进行分区。
我想最终得到以下结构的流:
- 键:房屋 - 组
- 值:用户 - 地址
为此,我正在执行以下操作:
val streamsBuilder = streamBuilderHolder.streamsBuilder
val houseToUser = streamsBuilder.stream<HouseGroup, User>("houseToUser")
val userToAddress = streamsBuilder.table<UserGroup, Address>("userToAddress")
val result: KStream<HouseGroup, UserWithAddress> = houseToUser
.map { k: HouseGroup, v: User ->
val newKey = UserGroup(v, k.group)
val newVal = UserHouse(v, k.house)
KeyValue(newKey, newVal)
}
.join(userToAddress) { v1: UserHouse, v2: Address ->
UserHouseWithAddress(v1, v2)
}
.map{k: UserGroup, v: UserHouseWithAddress ->
val newKey = HouseGroup(v.house, k.group)
val newVal = UserWithAddress(k.user, v.address)
KeyValue(newKey, newVal)
}
这需要匹配的连接,但没有成功。
我想明显的解决方案是加入全局表并放弃自定义分区程序。但是,我仍然不明白为什么上面的方法不起作用。
最佳答案
我认为缺少匹配是因为使用了不同分区器造成的。
对于您的输入主题,使用CustomPartitioner
。 Kafka Streams 默认使用 org.apache.kafka.clients. Producer.internals.DefaultPartitioner 。
在 KStream::join
之前的代码中,您调用了 KStream::map
。 KStream::map
函数在 KStream::join
之前强制执行重新分区。在重新分区期间,消息将刷新到 Kafka($AppName-KSTREAM-MAP-000000000X-repartition
主题)。为了传播消息,Kafka Streams 使用定义的分区器(属性:ProducerConfig.PARTITIONER_CLASS_CONFIG)。总结:具有相同键的消息可能位于“重新分区主题”和“KTable 主题”的不同分区
您的情况的解决方案将在 Kafka Streams 应用程序的属性中设置您的自定义分区(props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner"
)
要进行调试,您可以检查重新分区主题 ($AppName-KSTREAM-MAP-000000000X-repartition
)。具有相同键(如输入主题)的消息可能位于不同的分区(不同的数量)
关于java - 将 kafka-streams 与自定义分区器一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57930242/