java - 用于添加全局存储的 Kafka 流用例

标签 java apache-kafka apache-kafka-streams

在 kafka 流中定义拓扑时,可以添加全局状态存储。它将需要一个源主题以及一个 ProcessorSupplier .
处理器接收记录并且可以在将它们添加到存储之前理论上转换它们。但是在恢复的情况下,记录直接从源主题(更改日志)插入到全局状态存储中,跳过在处理器中完成的最终转换。

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration


StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder, String topic, Consumed consumed, ProcessorSupplier stateUpdateSupplier)将全局 StateStore 添加到拓扑。

根据文档

NOTE: you should not use the Processor to insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. This ProcessorNode should be used to keep the StateStore up-to-date.



同时,由于 kafka 错误跟踪器上的主要错误当前已打开:KAFKA-7663 Custom Processor supplied on addGlobalStore is not used when restoring state from topic它准确地解释了文档中所述的内容,但似乎是一个公认的错误。

我想知道 KAFKA-7663 是否确实是一个错误。根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例。
有人可以解释这个低级 API 的主要用例吗?我唯一能想到的就是处理副作用,比如在处理器中做一些日志操作。

额外问题:如果源主题作为全局存储的更改日志,当由于保留期已过期而从主题中删除记录时,它会从全局状态存储中删除吗?或者删除是否只会在从变更日志中完全恢复存储后在商店中进行。

最佳答案

是的,这是一个很奇怪的小捕获 22,但是文档是正确的。全局状态存储的处理器不得对记录执行任何操作,而是将它们保存到存储中。

AFAIK,这不是一个哲学问题,只是一个实际问题。原因只是您观察到的行为...... Streams 将输入主题视为存储的更改日志主题,因此在恢复期间绕过处理器(以及反序列化)。

状态恢复绕过任何处理的原因是更改日志中的数据通常与存储中的数据相同,因此对其进行任何新操作实际上都是错误的。另外,将字节从网络中取出并将它们批量写入状态存储中会更有效。我说“通常”是因为在这种情况下,输入主题与普通的更改日志主题并不完全一样,因为它在存储放置期间不会收到其写入。

对于它的值(value),我也很难理解用例。看起来,我们应该:

  • 完全摆脱该处理器,并始终将二进制数据从网络中转储到存储中,就像恢复一样。
  • 重新设计全局存储以允许在全局存储之前进行任意转换。我们可以:
  • 继续使用输入主题并在恢复期间反序列化和调用处理器,或
  • 为全局存储添加一个真正的变更日志,这样我们就可以轮询输入主题,应用一些转换,然后写入全局存储和 global-store-changelog。然后,我们可以使用更改日志(而不是输入)进行恢复和复制。

  • 顺便说一句,如果你想要后一种行为,你现在可以通过应用你的转换然后使用 to(my-global-changelog) 来近似它。制造“变更日志”主题。然后,您将创建全局存储以从您的 my-global-changelog 中读取数据。而不是输入。

    所以,给你一个直接的答案,KAFKA-7663 不是一个错误。我将评论提议将其转换为功能请求的票证。

    额外答案:作为状态存储更改日志的主题不得配置为保留。实际上,这意味着您应该通过启用压缩和禁用日志保留来防止无限增长。

    在实践中,旧数据脱离保留和被删除并不是一个“事件”,消费者无法知道它是否/何时发生。因此,不可能从状态存储中删除数据以响应此非事件。它会如您所描述的那样发生……这些记录将无限期地留在全局存储中。如果/当一个实例被替换时,新的实例将从输入中恢复并且(显然)只接收当时存在于主题中的记录。因此,Streams 集群作为一个整体最终会以不一致的全局状态 View 结束。这就是您应该禁用保留的原因。

    从存储中“删除”旧数据的正确方法是将所需键的墓碑写入输入主题。然后这将正确传播到集群的所有成员,在恢复期间正确应用,并由代理正确压缩。

    我希望这一切都有帮助。当然,请在票证上加入并帮助我们塑造更直观的 API!

    关于java - 用于添加全局存储的 Kafka 流用例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59029964/

    相关文章:

    java - 比较Java中的两个csv文件

    java - 使用 Xmx 增加 JVM 堆内存不起作用

    java - Spring和卡夫卡: Join 3 Kafka topics to generate output Kafka streams

    apache-kafka - Kafka Streams - kafka-streams-application-reset.sh 发送错误的 API 版本

    java - Java中的快速增量哈希

    java - 如何终止我的应用程序的远程进程?

    java - Apache 光束 : Kafka consumer restarted over and over again

    apache-kafka - 如何从 Kafka 主题中检索特定数量的消息

    apache-kafka - 为什么在 kafka 2.1.1 中 zstd 比 gzip 慢 5 倍?

    apache-kafka - 如何对 Kafka 中的数据进行反规范化?