java - 如何在 Kafka 0.11 中正确提交生产者并消费事务消息?

标签 java apache-kafka producer-consumer transactional producer

我正在尝试使用 Java 编写 Kafka Transnational 生产者。

喜欢

    producer.initTransactions();
    try {
        producer.beginTransaction();
        producer.send(rec, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null)
                    e.printStackTrace();
                System.out.println("The offset of the record we just sent is: " + metadata.offset());
        });
        producer.commitTransaction();
    }catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
        producer.close();
    }catch(KafkaException e) {
        producer.abortTransaction();
    }catch (Exception x){}
    producer.close();

它没有抛出任何错误。 send 也在 Kafka 中推送消息,它是可用的。

我可以看到的经纪人日志如下:

[2017-10-30 19:30:56,574] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __transaction_state-11. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-10-30 19:31:19,379] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.28508215642368573189913137 with producerId 11 and producer epoch 0 on partition __transaction_state-11 (kafka.coordinator.transaction.TransactionCoordinator)

5 分钟后我找到了这个代理日志。 [2017-10-30 19:36:44,123] INFO [Broker 1001 上的组元数据管理器]:在 0 毫秒内删除了 0 个过期偏移量。 (kafka.coordinator.group.GroupMetadataManager)

在此我只能看到事务已初始化,但没有进一步的提交日志或其他内容。

在生产者配置中我附加

transactional.id=<some random transaction ID>
enable.idempotence=true

如上所述请注意,如果配置了 TransactionalId,则必须启用enable.idempotence。默认为空,即不能使用交易。

我在 Kafka Documentation 中发现了一条语句生产者:发送 OffsetCommitRequest 以提交与该事务结束相关的输入状态

这是否意味着我必须告诉我要提交哪个Offset

我不确定制作人发生了什么

这是我的制作者调试日志:

1180 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.6069296543148491816257436, transactionTimeoutMs=60000) to node 127.0.0.1:9090 (id: 1001 rack: null)
1317 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] ProducerId set to 13 with epoch 0
1317 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state INITIALIZING to READY
1317 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state READY to IN_TRANSACTION
1323 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request (type=MetadataRequest, topics=topic) to node -1
1337 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [127.0.0.1:9090 (id: 1001 rack: null)], partitions = [Partition(topic = topic, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
1362 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Begin adding new partition topic-0 to transaction
1386 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0])
1387 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0]) to node 127.0.0.1:9090 (id: 1001 rack: null)
1389 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
1392 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT)
1437 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Successfully added partitions [topic-0] to transaction
1439 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning sequence number 0 from producer (producerId=13, epoch=0) to dequeued batch from partition topic-0 bound for 127.0.0.1:9090 (id: 1001 rack: null).
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.records-per-batch
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.bytes
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.compression-rate
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.record-retries
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.record-errors
1453 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence number for topic-partition topic-0 to 1
The offset of the record we just sent is: 13
1455 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT) to node 127.0.0.1:9090 (id: 1001 rack: null)
1457 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state COMMITTING_TRANSACTION to READY

我认为在提交交易之前我遗漏了一些东西。在消费者中,如果我设置 READ_COMMITTED ,我也无法消费。如果没有,它就可以正常工作,甚至我也会收到我使用事务生产者生成的消息。

我的消费者代码,用于读取我拥有的交易消息

configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9090");
configProperties.put("group.id","new-group-id");
configProperties.put("enable.auto.commit", "true");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

并且消费者正在订阅topic主题并且

我的消费者调试控制台日志是:

126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-0 at offset 1 to node 127.0.0.1:9090 (id: 1001 rack: null)
126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-0] to broker 127.0.0.1:9090 (id: 1001 rack: null)
126551 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 1 for partition topic-0 returned fetch data (error=NONE, highWaterMark=17, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)

在此,它连续重复相同的 3 行,我有 13 作为最高偏移值。在消费者中我无法消费消息。

我设置了 1 个节点集群,并在 3 个节点上尝试过,它也显示了相同的结果。

感谢任何帮助。

最佳答案

最后对我来说它开始起作用了。

我无法确切地说出问题是什么。但根据我的观察,这与 WINDOWS 操作系统有关。

如果我们的代理在 Windows 计算机上,它就无法按预期工作。如果代理在 Linux 机器上,它就可以正常工作。

我的观察:

Windows 计算机中 __transaction_state 的日志转储段

Dumping 00000000000000000000.index
offset: 0 position: 0
Dumping 00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 .... transactionalId=TXN_ID:1 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510143189059,txnTimeoutMs=60000
offset: 1 position: 117 .... transactionalId=TXN_ID:1 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189232,txnTimeoutMs=60000
offset: 2 position: 250 .... transactionalId=TXN_ID:1 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189393,txnTimeoutMs=60000
Dumping 00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :D:\tmp\kafka-logs-0\__transaction_state-9\00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1510143189059
Index timestamp: 0, log timestamp: 1510143189059
Found out of order timestamp in :D:\tmp\kafka-logs-0\__transaction_state-9\00000000000000000000.timeindex
Index timestamp: 0, Previously indexed timestamp: 0

在上面的日志中,您可以轻松地看到事务状态为 EmptyOngoingPrepareCommit。但没有提到提交/事务的完成。

但是在我的生产者控制台日志中,它表示事务已完成,因此肯定存在一些问题。

Linux 机器中 __transaction_state 的日志转储段

offset: 0 position: 0 .... transactionalId=TXN_ID:2 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510145904630,txnTimeoutMs=60000
offset: 1 position: 117 .... transactionalId=TXN_ID:2 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904763,txnTimeoutMs=60000
offset: 2 position: 250 .... transactionalId=TXN_ID:2 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904931,txnTimeoutMs=60000
offset: 3 position: 383 .... transactionalId=TXN_ID:2 ... ,state=CompleteCommit,partitions=Set(),txnLastUpdateTimestamp=1510145904938,txnTimeoutMs=60000

但是这里我们可以很容易地发现总共提到了 4 个不同的状态。

正在进行PrepareCommitCompleteCommit。这实际上使交易完成。

因此我们也可以重复使用该提交 ID。

因此,我可以得出结论,如果您想使用事务,请暂时在 Linux 上而不是 Windows 上使用 Kafka 代理。

关于java - 如何在 Kafka 0.11 中正确提交生产者并消费事务消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47018343/

相关文章:

java - 在 Unix 中使用 Java 发送多封邮件

java - 使用循环在 Java 中制作表格

java - 对标 Kafka - 性能一般

java - BlockingQueue 消费者在队列不为空时没有响应

algorithm - 对生产者消费者信号量顺序的一个小改动

java - JFreeChart拖拽绘图区

java - CellTable 在 TabLayoutPanel 中不可见

java - 如何在不同线程中处理@KafkaListener方法?

apache-kafka - 在 spring-kafka 中处理消费者错误后提交偏移量

java - Activemq如何配置消费者监听器(Java)