java - 如何连接两个KTable并将结果ktable写入状态存储

标签 java apache-kafka apache-kafka-streams

我有两个 KTable 对象:

KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));

 KTable<Long, byte[]> secondTable = builder.table("secondTopic",
        Consumed.with(Serdes.Long(), Serdes.ByteArray()));

之后我想加入这两个表:

firstTable.leftJoin(secondTable,
            (leftValue, rightValue) -> {
            try {
                return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
            }
          )

所以我有两个表,我将它们连接到一个表中,我希望结果表按每个键存储在 kafka 状态存储中,但我不知道该怎么做。

最佳答案

您可以通过在 leftJoin 上指定 Materialized 参数并指定状态存储的名称来强制物化到本地存储中。

firstTable.leftJoin(..., Materialized.as("my-store-name"));

关于java - 如何连接两个KTable并将结果ktable写入状态存储,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53616769/

相关文章:

java - 声纳在 Jackson JSON 的 REST 调用中更改对象名称的问题

java - Primefaces - 未捕获的语法错误 : Unexpected token ILLEGAL

java - 来自 socket.getInputStream() 的 ObjectInputStream

apache-kafka - Kafka Streams拓扑的处理顺序是否指定?

java - 如何根据 Kafka Stream 的 JSON 内容过滤事件

java - 从 jbutton 多次单击,然后填充表

apache-kafka - 如何更改 Kafka 中特定主题的 TTL

queue - 简单的拉取消息队列

apache-kafka - 访问 Kafka Streams 中聚合器内的 TimeWindow 属性

java - 使用自定义 TimestampExtractor 的 Kafka Streams 窗口