java - 如何使用Spring Kafka生产者发送批量数据

标签 java apache-kafka spring-kafka

目前我有这样的代码:

KafkaTemplate<String, String> kafkaTemplate;

List<Pet> myData;

for(Pet p: myData) {
  String json = objectWriter.writeValueAsString(p)
  kafkaTemplate.send(topic, json)
}

所以每个列表项都是一一发送的。 如何一次发送整个列表?

最佳答案

所以没有直接的方法可以直接使用KafkaTemplate向kafka发送批量消息或KafkaProducer 。他们没有任何接受 List 的方法对象并将它们单独发送到不同的分区。

kafka生产者如何向kafka发送消息?

KafkaProducer

Kafka 生产者创建一批记录,然后立即发送所有记录,了解更多 information

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

Asynchronous send

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.

由于您正在使用spring-kafka您可以发送List<Objects>但你在这里发送JSONArrayJSONObject而不是每个 JSONObject到主题分区

public KafkaTemplate<String, List<Object>> createTemplate() {

        Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
          new DefaultKafkaProducerFactory<String, List<Object>>(senderProps);
        KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
return template;

 }

 public Map<String, Object> producerProps() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
       return props;

 }

KafkaTemplate<String, List<Object>> kafkaTemplate;

关于java - 如何使用Spring Kafka生产者发送批量数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58492689/

相关文章:

java - 如何在Dead lettering中使用RepublishMessageRecoverer?

spring - 如果消息失败并由 AfterRollbackProcessor 处理,如何在 Spring Kafka 中提交偏移量

Java在源码中设置Kafka保留时间

Spring Boot Kafka 启动错误 "Connection to node -1 could not be established. Broker may not be available."

java - JxBrowser 6.14.2 系统要求不明确。在多个 Linux 发行版上创建浏览器失败

java - 显示字符串中匹配的字符

java - 单个文件中的多个接口(interface)

apache-kafka - server.properties 中的分区数和 apache kafka 中的主题创建 --partition 参数不明确

apache-kafka - kafka 流如何计算水印?

java - Spring-Kafka @KafkaHandlers 不消耗各自的 java 对象