我有两个 KTable 对象:
KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
KTable<Long, byte[]> secondTable = builder.table("secondTopic",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));
之后我想加入这两个表:
firstTable.leftJoin(secondTable,
(leftValue, rightValue) -> {
try {
return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
)
所以我有两个表,我将它们连接到一个表中,我希望结果表按每个键存储在 kafka 状态存储中,但我不知道该怎么做。
最佳答案
您可以通过在 leftJoin
上指定 Materialized
参数并指定状态存储的名称来强制物化到本地存储中。
firstTable.leftJoin(..., Materialized.as("my-store-name"));
关于java - 如何连接两个KTable并将结果ktable写入状态存储,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53616769/