java - 如何选择一个Kafka transaction.id

标签 java apache-kafka

我想知道我能否在理解Kafka中的交易方面获得帮助,尤其是如何使用transaction.id。这里是上下文:

  • 我的Kafka应用程序遵循以下模式:使用来自输入主题的消息,进行处理,然后发布到输出主题。
  • 我不使用Kafka Streams API。
  • 我在一个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。
  • 有一个带有工作线程的线程池,该线程用于执行消息处理并将其发布到输出主题。目前,每个线程都有自己的生产者实例。
  • 我正在使用已发布的事务API,以确保消耗偏移量的更新和对输出主题的发布原子地进行

  • 到目前为止,我的假设包括:
  • 如果我的进程在中间事务中崩溃,那么该事务中的任何内容都不会发布,也不会消耗消耗。因此,在重新启动时,我只需从原始的消耗偏移量再次启动事务即可。
  • 对于生产者transaction.id,重要的是它是唯一的。因此,我可以在启动
  • 时生成基于时间戳的id

    然后,我阅读了以下博客:https://www.confluent.io/blog/transactions-apache-kafka/。特别是在“如何选择交易ID”部分中,这似乎意味着我需要保证每个输入分区都有一个生产者实例。它说:“正确抵御僵尸的关键是确保对于给定的transactional.id,读-写-写循环中的输入主题和分区始终相同。”它还进一步列举了以下问题示例:“例如,在分布式流处理应用程序中,假定主题分区tp0最初是由transactional.id T0处理的。如果在以后的某个时候,它可以通过事务处理映射到另一个生产者.id T1,在T0和T1之间不会有隔离。因此,有可能重新处理来自tp0的消息,这完全违反了一次处理保证。”

    我不太明白为什么会这样。在我看来,只要事务是原子的,我就不在乎哪个生产者处理来自任何分区的消息。我已经为此努力了一天,我想知道是否有人可以告诉我我在这里错过的事情。因此,为什么不能将工作分配给具有任何transaction.id设置的任何生产者实例,只要它是唯一的。以及为什么他们说如果您这样做,则消息可能会通过事务提供的隔离机制泄漏。

    最佳答案

    您提到的博客文章包含了您正在寻找的所有信息,尽管内容非常密集。

    从为什么交易? aforementioned article中的部分。

    Using vanilla Kafka producers and consumers configured for at-least-once delivery semantics, a stream processing application could lose exactly once processing semantics in the following ways:

    1. The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.

    2. We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.

    3. Finally, in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics. We call this the problem of “zombie instances.” [emphasis added]



    same article的“事务语义”部分。

    Zombie fencing

    We solve the problem of zombie instances by requiring that each transactional producer be assigned a unique identifier called the transactional.id. This is used to identify the same producer instance across process restarts. [emphasis added]

    The API requires that the first operation of a transactional producer should be to explicitly register its transactional.id with the Kafka cluster. When it does so, the Kafka broker checks for open transactions with the given transactional.id and completes them. It also increments an epoch associated with the transactional.id. The epoch is an internal piece of metadata stored for every transactional.id.

    Once the epoch is bumped, any producers with same transactional.id and an older epoch are considered zombies and are fenced off, ie. future transactional writes from those producers are rejected. [emphasis added]



    并从same article中的“数据流”部分开始。

    A: the producer and transaction coordinator interaction

    When executing transactions, the producer makes requests to the transaction coordinator at the following points:

    1. The initTransactions API registers a transactional.id with the coordinator. At this point, the coordinator closes any pending transactions with that transactional.id and bumps the epoch to fence out zombies. This happens only once per producer session. [emphasis added]

    2. When the producer is about to send data to a partition for the first time in a transaction, the partition is registered with the coordinator first.

    3. When the application calls commitTransaction or abortTransaction, a request is sent to the coordinator to begin the two phase commit protocol.



    希望这可以帮助!

    关于java - 如何选择一个Kafka transaction.id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50335227/

    相关文章:

    java - 如何从 Kafka 访问记录中的文件路径并从中创建数据集?

    java - Toast动画中的CharSequence没有出现?安卓

    java - Android Wear - Google Maps API v2 可用吗?

    apache-kafka - 如何在消费者组kafka中动态添加消费者

    apache-kafka - Kafka 连接器 - 无法停止重新平衡

    hadoop - 无法全局访问 Kafka Spark Streaming 中的数据

    java - 让 JPA 生成具有指定行格式的表

    java - 如何使excel单元格成为强制性的?

    java - 使用 Spring JPA 的单向多对多映射

    apache-kafka - Spring kafka 消费者,在运行时寻求偏移量?