java - 将 kafka-streams 与自定义分区器一起使用

标签 java apache-kafka apache-kafka-streams

我想将 KStream 与 KTable 结合起来。两者都有不同的 key ,但使用自定义分区程序进行共同分区。但是,连接不会产生结果。

KStream具有以下结构
- 键:房屋 - 组
- 值:用户
KTable具有以下结构
- 键:用户 - 组
- 值:地址

为了确保每次插入时都按插入顺序处理两个主题,我使用自定义分区程序,使用每个键的组部分对两个主题进行分区。

我想最终得到以下结构的流:
- 键:房屋 - 组
- 值:用户 - 地址

为此,我正在执行以下操作:

val streamsBuilder = streamBuilderHolder.streamsBuilder
val houseToUser = streamsBuilder.stream<HouseGroup, User>("houseToUser")
val userToAddress = streamsBuilder.table<UserGroup, Address>("userToAddress")
val result: KStream<HouseGroup, UserWithAddress> = houseToUser
        .map { k: HouseGroup, v: User ->
            val newKey = UserGroup(v, k.group)
            val newVal = UserHouse(v, k.house)
            KeyValue(newKey, newVal)
        }
        .join(userToAddress) { v1: UserHouse, v2: Address ->
            UserHouseWithAddress(v1, v2)
        }
        .map{k: UserGroup, v: UserHouseWithAddress ->
            val newKey = HouseGroup(v.house, k.group)
            val newVal = UserWithAddress(k.user, v.address)
            KeyValue(newKey, newVal)
        }

这需要匹配的连接,但没有成功。

我想明显的解决方案是加入全局表并放弃自定义分区程序。但是,我仍然不明白为什么上面的方法不起作用。

最佳答案

我认为缺少匹配是因为使用了不同分区器造成的。

对于您的输入主题,使用CustomPartitioner。 Kafka Streams 默认使用 org.apache.kafka.clients. Producer.internals.DefaultPartitioner 。

KStream::join 之前的代码中,您调用了 KStream::mapKStream::map 函数在 KStream::join 之前强制执行重新分区。在重新分区期间,消息将刷新到 Kafka($AppName-KSTREAM-MAP-000000000X-repartition 主题)。为了传播消息,Kafka Streams 使用定义的分区器(属性:ProducerConfig.PARTITIONER_CLASS_CONFIG)。总结:具有相同键的消息可能位于“重新分区主题”和“KTable 主题”的不同分区

您的情况的解决方案将在 Kafka Streams 应用程序的属性中设置您的自定义分区(props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner" )

要进行调试,您可以检查重新分区主题 ($AppName-KSTREAM-MAP-000000000X-repartition)。具有相同键(如输入主题)的消息可能位于不同的分区(不同的数量)

关于 Join co-partitioning requirements 的文档

关于java - 将 kafka-streams 与自定义分区器一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57930242/

相关文章:

java - 如何永久缓存加载的数据(图像)?

java - 如何获取存储在 google-cloud-storage 存储桶子目录中的图像的 URL

java - 尽管mapWithState中的元素相同,但为什么所有元素都被打印

apache-kafka - Kafka Streams 窗口加入保留

apache-kafka - 我应该用什么 : Kafka Stream or Kafka consumer api or Kafka connect

java - Confluence Kafka Streams - 找不到类 io.confluence.connect.avro.ConnectDefault

java - 似乎无法将文件读入矩阵?

java - 数据库和Java

python - PyKafka producer.get_delivery_report 在 block=false 时抛出 Queue.empty

java - 为 Kafka 流上的窗口数据创建 SerDes