apache-kafka-streams - 根据部分数据属性更新KTable

标签 apache-kafka-streams spring-kafka spring-cloud-stream

我正在尝试使用对象的部分数据更新 KTable。 例如。用户对象是 {"id":1, "name":"Joe", "age":28} 该对象被流式传输到主题中并按键分组到 KTable 中。 现在,用户对象已部分更新,如下 {"id":1, "age":33} 并流入表中。但更新后的表如下所示 {"id":1, "name":null, "age":28}。 预期输出为 {"id":1, "name":"Joe", "age":33}。 如何使用 Kafka Streams 和 Spring Cloud Streams 来实现预期的输出。任何建议,将不胜感激。谢谢。

这是代码

 @Bean
        public Function<KStream<String, User>, KStream<String, User>> process() {
            return input -> input.map((key, user) -> new KeyValue<String, User>(user.getId(), user))
                    .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(User.class))).reduce((user1, user2) -> {
                        user1.merge(user2);
                        return user1;
                    }, Materialized.as("allusers")).toStream();
        }

并使用以下代码修改了 User 对象:

    public void merge(Object newObject) {
        assert this.getClass().getName().equals(newObject.getClass().getName());
        for (Field field : this.getClass().getDeclaredFields()) {
            for (Field newField : newObject.getClass().getDeclaredFields()) {
                if (field.getName().equals(newField.getName())) {
                    try {
                        field.set(this, newField.get(newObject) == null ? field.get(this) : newField.get(newObject));
                    } catch (IllegalAccessException ignore) {
                    }
                }
            }
        }
    }

这是正确的方法还是 KStreams 中的任何其他方法?

最佳答案

我已经测试了您的合并代码,它似乎按预期工作。但是由于 reduce 后的结果是 {"id":1, "name":null, "age":28},我可以想到两件事:

  • 您的状态根本没有更新,因为没有任何属性发生更改。
  • 也许您遇到了序列化问题,因为 string 属性为 null,但其他 int 属性都很好。

我的猜测是,因为您正在改变原始对象并返回相同的值,所以 kafka 流不会将其检测为更改,并且不会存储新状态。实际上,您不应该改变您的对象,因为它可能会导致不确定性,具体取决于您的管道。

尝试更改您的 merge 函数以创建新的 User 对象,并查看行为是否发生变化。

关于apache-kafka-streams - 根据部分数据属性更新KTable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58960806/

相关文章:

java - 如何使用 null 键对 kstream 执行操作?

java - 使用 Spring Kafka 反序列化来自同一 Kafka 主题的不同 JSON 有效负载

java - 在 Kafka 中重播消息

spring-integration - 如何在Spring Cloud Stream中手动确认RabbitMQ消息?

spring-boot - Spring AMQP - 如何确认消息已成功传递和路由?

java - 如何在不同的 RabbitMQ vhost 上设置 Spring Cloud Stream Bindings 的 Binder

apache-kafka - 增加连接窗口大小和设置宽限期之间的区别

docker - 替代openjdk :8-alpine for Kafka Streams

apache-kafka - 如何检测kafka主题中的重复消息?

java - 如何将 spring.cloud.stream.kafka.bindings 配置属性应用于所有消费者