让我们考虑多线程跨国 kafka 生产者。我应该在关闭之前 flush()
生产者吗?换句话说,事务生产者在发送数据之前是否会批量缓冲数据?
最佳答案
如 javadocs 中所述
Applications don't need to call flush method for transactional producers, since the commitTransaction() will flush all buffered records before performing the commit
javadoc 示例对此进行了最好的说明
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
简而言之,最好使用生产者的事务性 API(它们是阻塞的,并且在失败时会抛出异常)。
此外,在多线程应用程序的情况下,您需要确保每个生产者只有一个打开的事务。如果您在交易过程中遇到异常,您应该调用 producer.abortTransaction()
(也在示例中突出显示)维护生产者事务功能的恰好一次语义。
关于java - 多线程事务性卡夫卡生产者 - 我应该在关闭之前刷新吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60304726/