java - 多线程事务性卡夫卡生产者 - 我应该在关闭之前刷新吗?

标签 java apache-kafka kafka-producer-api

让我们考虑多线程跨国 kafka 生产者。我应该在关闭之前 flush() 生产者吗?换句话说,事务生产者在发送数据之前是否会批量缓冲数据?

最佳答案

javadocs 中所述

Applications don't need to call flush method for transactional producers, since the commitTransaction() will flush all buffered records before performing the commit

javadoc 示例对此进行了最好的说明

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
 producer.beginTransaction();
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
 producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
 // We can't recover from these exceptions, so our only option is to close the producer and exit.
 producer.close();
} catch (KafkaException e) {
 // For all other exceptions, just abort the transaction and try again.
 producer.abortTransaction();
}
producer.close();

简而言之,最好使用生产者的事务性 API(它们是阻塞的,并且在失败时会抛出异常)。

此外,在多线程应用程序的情况下,您需要确保每个生产者只有一个打开的事务。如果您在交易过程中遇到异常,您应该调用 producer.abortTransaction() (也在示例中突出显示)维护生产者事务功能的恰好一次语义

关于java - 多线程事务性卡夫卡生产者 - 我应该在关闭之前刷新吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60304726/

相关文章:

java - 如何在 Kafka 中为消费者发送 OffsetCommitRequest?

apache-kafka - 是 ISR 列表的领导者部分

java - Kafka 偏移量未增加

apache-kafka - Kafka : The message when serialized is larger than the maximum request size you have configured with the max. request.size 配置

java - 从具有最低键的优先级队列中删除节点

java - 如何通过单击在 JButton 上添加 JTextField?

java - 使用带有自定义编码器的 PubsubIO 阅读

java - 使用 selenium 网络驱动程序加载 chrome 浏览器的问题

apache-kafka - Kafka Streams 处理器 - 状态存储和输入主题分区

apache-kafka - kafka artifactIds kafka_2.10 和 kafka-clients 有什么区别?