apache-kafka - 从消费者开始时从主题中获取最新值,然后正常继续

标签 apache-kafka kafka-consumer-api apache-kafka-streams ksqldb

我们有一个 Kafka 生产者,它以非常高的频率向保留时间 = 10 小时的主题生成键控消息。这些消息是实时更新的,使用的键是值已更改的元素的 ID。因此,该主题充当更改日志,并且将有许多重复的键。

现在,我们试图实现的是,当 Kafka 消费者启动时,无论最后的已知状态如何(新消费者、崩溃、重启等),它都会以某种方式构造一个包含所有键的最新值的表在一个主题中,然后像往常一样继续监听新的更新,保持 Kafka 服务器上的最小负载并让消费者完成大部分工作。我们尝试了很多方法,但似乎没有一个是最好的。

我们尝试了什么:

1 个变更日志主题 + 1 个紧凑主题:

  • 生产者向包含在事务中的两个主题发送相同的消息以确保成功发送。
  • 消费者启动并请求更新日志主题的最新偏移量。
  • 从一开始就消耗压缩的主题来构造表。
  • 自请求的偏移量以来继续使用更改日志。

  • 缺点:
  • 即使将日志压缩频率设置为尽可能高,在压缩主题中有重复的可能性也很高。
  • Kakfa 服务器上的 x2 主题数。

  • KSQL:

    使用 KSQL,我们要么必须将 KTable 重写为主题,以便消费者可以看到它(额外主题),要么我们需要消费者执行 KSQL SELECT使用 KSQL Rest Server 并查询表(不如 Kafka API 快速和高性能)。

    卡夫卡消费者API:

    消费者从头开始消费话题。这工作得很好,但消费者必须使用 10 小时的更改日志来构建最后一个值表。

    卡夫卡流:

    通过使用 KTables 如下:

    KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
    KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));
    

    Kafka Streams 将在每个 KTable 的 Kafka 服务器上创建 1 个主题(名为 {consumer_app_id}-{topic_name}-STATE-STORE-0000000000-changelog ),由于我们有大量的消费者,这将导致大量的主题。

    根据我们的尝试,看起来我们需要增加服务器负载或消费者启动时间。难道没有一种“完美”的方式来实现我们想要做的事情吗?

    提前致谢。

    最佳答案

    By using KTables, Kafka Streams will create 1 topic on Kafka server per KTable, which will result in a huge number of topics since we a big number of consumers.



    如果您只是将现有主题读入 KTable (通过 StreamsBuilder#table() ),那么 Kafka Streams 不会创建额外的主题。 KSQL 也一样。

    如果您能明确您想对 KTable(s) 做什么,将会有所帮助。显然您正在做的事情确实会导致创建其他主题?

    1 changelog topic + 1 compact topic:



    你为什么要考虑有两个不同的主题?通常,更改日志主题应始终压缩。鉴于您的用例描述,我看不出它不应该是的原因:

    Now, what we're trying to achieve is that when a Kafka consumer launches, regardless of the last known state (new consumer, crashed, restart, etc..), it will somehow construct a table with the latest values of all the keys in a topic, and then keeps listening for new updates as normal [...]



    因此,压缩对您的用例非常有用。它还可以防止您描述的这个问题:

    Consumer starts and consumes the topic from beginning. This worked perfectly, but the consumer has to consume the 10 hours change log to construct the last values table.



    请注意,要重建最新的表值,Kafka Streams、KSQL 和 Kafka Consumer 三个都必须完全(从头到尾)读取表的底层主题。如果该主题未压缩,这可能确实需要很长时间,具体取决于数据量、主题保留设置等。

    From what we have tried, it looks like we need to either increase the server load, or the consumer launch time. Isn't there a "perfect" way to achieve what we're trying to do?



    如果不了解您的用例的更多信息,特别是填充 KTable(s) 后您想对它们做什么,我的答案是:
  • 确保“变更日志主题”也被压缩。
  • 首先尝试 KSQL。如果这不能满足您的需求,请尝试使用 Kafka Streams。如果这不能满足您的需求,请尝试使用 Kafka Consumer。

  • 例如,如果 Kafka Consumer 应该对“表”数据进行任何有状态处理,我就不会使用它,因为 Kafka Consumer 缺少用于容错状态处理的内置功能。

    关于apache-kafka - 从消费者开始时从主题中获取最新值,然后正常继续,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55748728/

    相关文章:

    java - 连接到在 Docker 中运行的 Kafka

    python - Kafka-python Producer执行Send,但是没有数据到达Kafka

    apache-kafka - 使用 spring-kafka 保证消息顺序的指数退避

    apache-kafka - 已激活压缩的 Kafka 消息大小

    apache-kafka - 我如何找到kafka消费者的费率?

    apache-kafka - 如果在收到创建消息之前收到更新消息,如何处理kafka消息?

    apache-kafka - Kafka 传递重复消息

    apache-kafka - Kafka 流与 Kafka 消费者如何决定使用什么

    apache-kafka - Apache Kafka 状态存储

    apache-kafka - 哪个 kafka 属性决定了 KafkaConsumer 的轮询频率?