我们有一个 Kafka 生产者,它以非常高的频率向保留时间 = 10 小时的主题生成键控消息。这些消息是实时更新的,使用的键是值已更改的元素的 ID。因此,该主题充当更改日志,并且将有许多重复的键。
现在,我们试图实现的是,当 Kafka 消费者启动时,无论最后的已知状态如何(新消费者、崩溃、重启等),它都会以某种方式构造一个包含所有键的最新值的表在一个主题中,然后像往常一样继续监听新的更新,保持 Kafka 服务器上的最小负载并让消费者完成大部分工作。我们尝试了很多方法,但似乎没有一个是最好的。
我们尝试了什么:
1 个变更日志主题 + 1 个紧凑主题:
缺点:
KSQL:
使用 KSQL,我们要么必须将 KTable 重写为主题,以便消费者可以看到它(额外主题),要么我们需要消费者执行 KSQL
SELECT
使用 KSQL Rest Server 并查询表(不如 Kafka API 快速和高性能)。卡夫卡消费者API:
消费者从头开始消费话题。这工作得很好,但消费者必须使用 10 小时的更改日志来构建最后一个值表。
卡夫卡流:
通过使用 KTables 如下:
KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));
Kafka Streams 将在每个 KTable 的 Kafka 服务器上创建 1 个主题(名为
{consumer_app_id}-{topic_name}-STATE-STORE-0000000000-changelog
),由于我们有大量的消费者,这将导致大量的主题。根据我们的尝试,看起来我们需要增加服务器负载或消费者启动时间。难道没有一种“完美”的方式来实现我们想要做的事情吗?
提前致谢。
最佳答案
By using KTables, Kafka Streams will create 1 topic on Kafka server per KTable, which will result in a huge number of topics since we a big number of consumers.
如果您只是将现有主题读入
KTable
(通过 StreamsBuilder#table()
),那么 Kafka Streams 不会创建额外的主题。 KSQL 也一样。如果您能明确您想对 KTable(s) 做什么,将会有所帮助。显然您正在做的事情确实会导致创建其他主题?
1 changelog topic + 1 compact topic:
你为什么要考虑有两个不同的主题?通常,更改日志主题应始终压缩。鉴于您的用例描述,我看不出它不应该是的原因:
Now, what we're trying to achieve is that when a Kafka consumer launches, regardless of the last known state (new consumer, crashed, restart, etc..), it will somehow construct a table with the latest values of all the keys in a topic, and then keeps listening for new updates as normal [...]
因此,压缩对您的用例非常有用。它还可以防止您描述的这个问题:
Consumer starts and consumes the topic from beginning. This worked perfectly, but the consumer has to consume the 10 hours change log to construct the last values table.
请注意,要重建最新的表值,Kafka Streams、KSQL 和 Kafka Consumer 三个都必须完全(从头到尾)读取表的底层主题。如果该主题未压缩,这可能确实需要很长时间,具体取决于数据量、主题保留设置等。
From what we have tried, it looks like we need to either increase the server load, or the consumer launch time. Isn't there a "perfect" way to achieve what we're trying to do?
如果不了解您的用例的更多信息,特别是填充 KTable(s) 后您想对它们做什么,我的答案是:
例如,如果 Kafka Consumer 应该对“表”数据进行任何有状态处理,我就不会使用它,因为 Kafka Consumer 缺少用于容错状态处理的内置功能。
关于apache-kafka - 从消费者开始时从主题中获取最新值,然后正常继续,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55748728/