apache-kafka-streams - RocksDBStore 与 Serdes.Bytes() 和 Serdes.ByteArray() 的目的是什么?

标签 apache-kafka-streams

RocksDBStore<K,V>将键和值存储为 byte[]在磁盘上。它与 K 相互转换和V使用 Serdes 输入对象在构造 RocksDBStore<K,V> 的对象时提供.

鉴于此,请帮助我理解 RocksDbKeyValueBytesStoreSupplier 中以下代码的用途:

return new RocksDBStore<>(name,
                          Serdes.Bytes(),
                          Serdes.ByteArray());

提供Serdes.Bytes()Serdes.ByteArray()看起来很多余。

RocksDbKeyValueBytesStoreSupplier介绍于KAFKA-5650 (Kafka Streams 1.0.0)作为 KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines 的一部分.

在KIP-182中,有这样一句话:

The new Interface BytesStoreSupplier supersedes the existing StateStoreSupplier (which will remain untouched). This so we can provide a convenient way for users creating custom state stores to wrap them with caching/logging etc if they chose. In order to do this we need to force the inner most store, i.e, the custom store, to be a store of type <Bytes, byte[]>.

请帮助我理解为什么我们需要强制自定义商店的类型为 <Bytes, byte[]>

在另一个地方( KAFKA-5749 )我发现了类似的句子:

In order to support bytes store we need to create a MeteredSessionStore and ChangeloggingSessionStore. We then need to refactor the current SessionStore implementations to use this. All inner stores should by of type < Bytes, byte[] >

为什么?

最佳答案

您的观察是正确的——实现 KIP-182 的 PR 确实错过了从 RocksDBStore 中删除不再需要的 Serdes。该问题已在 1.1 版本中修复。

关于apache-kafka-streams - RocksDBStore 与 Serdes.Bytes() 和 Serdes.ByteArray() 的目的是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49697639/

相关文章:

go - 使用 sarama 编写 Kafka 制作人时无效的时间戳

java - Kafka Streams - 偏移量提交失败。请求超时

apache-kafka - 卡夫卡流 : Custom TimestampExtractor for aggregation

apache-kafka-streams - 如何设置 Kafka 流创建的状态存储的保留期

apache-kafka - Kafka Streams - GenericAvroSerde 上的未知魔法字节

apache-kafka - kafka streams - 加入分区主题

java - Kafka GroupTable 测试使用 ProcessorTopologyTestDriver 时生成额外消息

java - 删除 Kafka StateStore 中的记录不起作用(.delete(key) 上抛出 NullPointerException)

apache-kafka-streams - Kafka 流使用 dsl api 中调用的处理器的上下文转发

apache-kafka - 如何设置Kafka Streams消息压缩?