java - Kafka Stream 和 KGlobalTable Join 问题

标签 java apache-kafka apache-kafka-streams

我在加入 KStream 与 GlobalKTable 时遇到问题,非常感谢您的帮助。

给定两个 Kafka 主题 orderscustomers:

订单

"1"     {"ID":"1","Name":"Myorder1","CustID":"100"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200"}

客户

"100"   {"CustID":"100","CustName":"Customer1"}

"200"   {"CustID":"200","CustName":"Customer2"}

要求是用客户名称丰富订单流

"1"     {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}

我正在尝试以下操作:

  1. orders 主题构建 KStream
  2. 根据客户主题构建 GlobalKTable
  3. 构建另一个连接订单和客户的流(在客户表中查找 Order.CustID)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
    customers,
    new KeyValueMapper<String, Order, String>() {            
        @Override
        public String apply(String key, Order value) {
           return value.CustID;
        }
    },
    new ValueJoiner<Order,Customer, EnrichedOrder>() {
        @Override
        public EnrichedOrder apply(Order order, Customer customer) {
            EnrichedOrder eorder = new EnrichedOrder();
            eorder.CustID = order.CustID;
            eorder.CustName = customer.CustName;
            eorder.ID = order.ID;
            eorder.Name = order.Name;           
            return eorder;
        }
    }
);

但它没有给出任何结果,也没有抛出任何异常。

使用 leftJoin 时,我收到 Customer 的 NullPointer 异常。

如果您遇到类似问题,请告诉我并建议如何解决此问题。

最佳答案

让我们仔细看看复制粘贴的内容:

客户主题中:

"100"   {"CustID":"100","CustName":"Customer1"}

您可以注意到键是一个字符串,并且此字符串包含双引号:“100”。通常,打印的字符串键不带双引号。我宁愿看到:

 100    {"CustID":"100","CustName":"Customer1"}

换句话说,您的 key 的 Java 字符串表示形式是 ""100"" (或 "\"100\""),而不是 "100" 正如我们所期望的。

另一方面,orders 主题中的值是 Json {"ID":"1","Name":"Myorder1","CustID":"100 "},属性 CustID 是一个字符串,这次用 Java "100" 表示。

当您加入 orderscustomers 时,您尝试将订单 CustID 100 与客户 key “100”。由于 CustID 中缺少 key 中的双引号,因此此操作将会失败。

关于java - Kafka Stream 和 KGlobalTable Join 问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56437585/

相关文章:

java - 从同一目录中的文件读取

java - Spring-Kafka 使用 ConcurrentKafkaListenerContainerFactory 来处理多个 @Kafkalistener

apache-kafka - 如何在消费者组kafka中动态添加消费者

java - Kafka 的流 API 可以帮助分发数百个分页请求吗?

java - Spring Boot Kafka 客户端是否有 "Circuit Breaker"?

java - Kafka Streams Global Store - 添加更改日志主题

java-正则表达式 : How to write regular expression for two digit numbers

java - Gridbag布局还是Grid布局?

java - "AWT widgets written in Java which delegated to peer classes that were written in C"是什么意思?

java - 为每个请求创建新的 Kafka Producer 对象