java - 简单的KTable构造

标签 java apache-kafka apache-kafka-streams

概念上

假设我有一个名为“地址”的主题,其中键是人名(字符串),值是人的地址(字符串)。对该人的地址的更新将是一条由其姓名作为键和新地址作为值组成的消息。因此,如果我只想要任何一个键的最新值,我想我会创建一个 ktable。当我这样做时,这里实际上发生了什么? Kafka 是否创建了一个实际上是 ktable 的新主题并截断了旧值?或者我是否必须为当前地址创建一个新主题?或者完全是另外一回事?

实际上

我发现的所有示例和教程都使用已弃用的方法,因此我希望有更新的方法。我当前的解决方案是阅读如下主题:

final Properties config = new Properties();
//leaving out all the config.put() for readability

final Consumer<String, String> consumer = new KafkaConsumer<>(config);

consumer.subscribe(Collections.singletonList("addresses"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(200);
        for (ConsumerRecord<String, String> record : records) {
            //do stuff
        }
    }
} finally {
    consumer.close();
}

这工作了一段时间,但现在我想要一个有状态的解决方案。有没有一种简单的方法可以将信息放入 ktable 中?我不想过滤它或任何东西,我只想任何条目来更新状态。提前致谢。

最佳答案

KTable 不会创建新主题。相反,它将源主题上的消息视为数据库中的更新插入 - 如果这是第一次看到该键,则就像插入一条新记录。如果不是第一次,则会将其视为对现有记录的更新。 KTable 可以用作 Kafka Streams 其他部分的输入,您也可以将该状态保存在本地存储中,并在应用程序的其他部分中查询该 K/V 存储。

关于java - 简单的KTable构造,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51659327/

相关文章:

apache-kafka - 使用选择键和转换在 DSL 拓扑上进行流重新分区

java - 如果添加新分区,会丢失 Kafka Streams 中的消息吗?

java - 无法使按钮对其他 Activity 不可见

java - 一旦我运行 kafka 服务器命令,Zookeeper 就会自动关闭

java - Maven:多模块项目依赖关系未解决(父pom)

java - Spring @KafkaListener 和并发

apache-kafka - 如何为实时股票价格设计发布/订阅架构

apache-kafka-streams - Spring Cloud Kafka Streams Binder 运行状况为 "UNKNOWN"

java - 在 SWT 浏览器中访问 DOM 对象

java - 如何计算显式创建的继承树中特定类的实例数?