我在加入 KStream 与 GlobalKTable 时遇到问题,非常感谢您的帮助。
给定两个 Kafka 主题 orders
和 customers
:
订单
"1" {"ID":"1","Name":"Myorder1","CustID":"100"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200"}
客户
"100" {"CustID":"100","CustName":"Customer1"}
"200" {"CustID":"200","CustName":"Customer2"}
要求是用客户名称丰富订单流
"1" {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}
我正在尝试以下操作:
- 从
orders
主题构建 KStream - 根据
客户
主题构建 GlobalKTable - 构建另一个连接订单和客户的流(在客户表中查找 Order.CustID)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
customers,
new KeyValueMapper<String, Order, String>() {
@Override
public String apply(String key, Order value) {
return value.CustID;
}
},
new ValueJoiner<Order,Customer, EnrichedOrder>() {
@Override
public EnrichedOrder apply(Order order, Customer customer) {
EnrichedOrder eorder = new EnrichedOrder();
eorder.CustID = order.CustID;
eorder.CustName = customer.CustName;
eorder.ID = order.ID;
eorder.Name = order.Name;
return eorder;
}
}
);
但它没有给出任何结果,也没有抛出任何异常。
使用 leftJoin
时,我收到 Customer 的 NullPointer 异常。
如果您遇到类似问题,请告诉我并建议如何解决此问题。
最佳答案
让我们仔细看看复制粘贴的内容:
在客户
主题中:
"100" {"CustID":"100","CustName":"Customer1"}
您可以注意到键是一个字符串,并且此字符串包含双引号:“100”
。通常,打印的字符串键不带双引号。我宁愿看到:
100 {"CustID":"100","CustName":"Customer1"}
换句话说,您的 key 的 Java 字符串表示形式是 ""100""
(或 "\"100\""
),而不是 "100"
正如我们所期望的。
另一方面,orders
主题中的值是 Json {"ID":"1","Name":"Myorder1","CustID":"100 "}
,属性 CustID
是一个字符串,这次用 Java "100"
表示。
当您加入 orders
和 customers
时,您尝试将订单 CustID 100
与客户 key “100”
。由于 CustID 中缺少 key 中的双引号,因此此操作将会失败。
关于java - Kafka Stream 和 KGlobalTable Join 问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56437585/