我想知道我能否在理解Kafka中的交易方面获得帮助,尤其是如何使用transaction.id。这里是上下文:
到目前为止,我的假设包括:
然后,我阅读了以下博客:https://www.confluent.io/blog/transactions-apache-kafka/。特别是在“如何选择交易ID”部分中,这似乎意味着我需要保证每个输入分区都有一个生产者实例。它说:“正确抵御僵尸的关键是确保对于给定的transactional.id,读-写-写循环中的输入主题和分区始终相同。”它还进一步列举了以下问题示例:“例如,在分布式流处理应用程序中,假定主题分区tp0最初是由transactional.id T0处理的。如果在以后的某个时候,它可以通过事务处理映射到另一个生产者.id T1,在T0和T1之间不会有隔离。因此,有可能重新处理来自tp0的消息,这完全违反了一次处理保证。”
我不太明白为什么会这样。在我看来,只要事务是原子的,我就不在乎哪个生产者处理来自任何分区的消息。我已经为此努力了一天,我想知道是否有人可以告诉我我在这里错过的事情。因此,为什么不能将工作分配给具有任何transaction.id设置的任何生产者实例,只要它是唯一的。以及为什么他们说如果您这样做,则消息可能会通过事务提供的隔离机制泄漏。
最佳答案
您提到的博客文章包含了您正在寻找的所有信息,尽管内容非常密集。
从为什么交易? aforementioned article中的部分。
Using vanilla Kafka producers and consumers configured for at-least-once delivery semantics, a stream processing application could lose exactly once processing semantics in the following ways:
The
producer.send()
could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.
Finally, in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics. We call this the problem of “zombie instances.” [emphasis added]
从same article的“事务语义”部分。
Zombie fencing
We solve the problem of zombie instances by requiring that each transactional producer be assigned a unique identifier called the
transactional.id
. This is used to identify the same producer instance across process restarts. [emphasis added]The API requires that the first operation of a transactional producer should be to explicitly register its
transactional.id
with the Kafka cluster. When it does so, the Kafka broker checks for open transactions with the giventransactional.id
and completes them. It also increments an epoch associated with thetransactional.id
. The epoch is an internal piece of metadata stored for everytransactional.id
.Once the epoch is bumped, any producers with same
transactional.id
and an older epoch are considered zombies and are fenced off, ie. future transactional writes from those producers are rejected. [emphasis added]
并从same article中的“数据流”部分开始。
A: the producer and transaction coordinator interaction
When executing transactions, the producer makes requests to the transaction coordinator at the following points:
The
initTransactions
API registers atransactional.id
with the coordinator. At this point, the coordinator closes any pending transactions with thattransactional.id
and bumps the epoch to fence out zombies. This happens only once per producer session. [emphasis added]When the producer is about to send data to a partition for the first time in a transaction, the partition is registered with the coordinator first.
When the application calls
commitTransaction
orabortTransaction
, a request is sent to the coordinator to begin the two phase commit protocol.
希望这可以帮助!
关于java - 如何选择一个Kafka transaction.id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50335227/