apache-kafka - 如何从 KTable 中获取排序后的 KeyValueStore?

标签 apache-kafka apache-kafka-streams rocksdb

我想从 KStream 中具体化一个 KTable,并且希望 KeyValueStore 按 Key 排序。

我尝试查找 KTable API 规范 ( https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KTable.html ),但不存在“排序”方法。我还查阅了这篇文章 ( https://dzone.com/articles/how-to-order-streamed-dataframes ),建议通过处理器 API 实现排序。但是,我正在检查是否可以通过其他方式实现这一点?

最佳答案

KafkaStream 允许您物化可查询状态存储。 然后,您可以通过调用方法 kafkaStream#store() 获得对存储的只读访问权限。

如果您定义持久存储,KafkaStreams 将使用 RocksDB 来存储您的数据。返回的 KeyValueIterator 实例将使用 RocksDB 迭代器,它允许您以排序的方式迭代键值 Rocks Iterator-Implementation .

示例:

    KafkaStreams streams = new KafkaStreams(topology, props);
    ReadOnlyKeyValueStore<Object, Object> store = streams.store("storeName", QueryableStoreTypes.keyValueStore());
    KeyValueIterator<Object, Object> iterator = store.all();

关于apache-kafka - 如何从 KTable 中获取排序后的 KeyValueStore?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55636157/

相关文章:

apache-kafka - Kafka 中的延迟消息消费

java - 卡夫卡 -> Spark流 -> Hbase。任务不可序列化错误由 : java. lang.IllegalStateException 引起:作业处于 DEFINE 状态而不是 RUNNING 状态

java - Kafka Streams - 提取每条记录的对象列表的时间戳

java - Flinkrocksdb压缩过滤器不工作

c++ - RocksDB 获取列族键值

rocksdb - 多个rocksdb实例

distributed-computing - 流处理引擎的并行行为

apache-kafka - Kafka Streams 线程模型在同一实例和 JVM 上具有多个流

apache-kafka - 如何发现并过滤掉Kafka Streams中的重复记录

apache-kafka - RocksDB数据丢失