apache-kafka - Kafka 上的多线程 在 Spring Reactor Kafka 中发送

标签 apache-kafka spring-webflux spring-kafka project-reactor reactive-kafka

我有一个响应式(Reactive) kafka 应用程序,它从一个主题读取数据,转换消息并写入另一个主题。我的主题中有多个分区,因此我正在创建多个使用者来并行读取主题。每个消费者都在不同的线程上运行。但看起来 kafka send 运行在同一个线程上,即使它是从不同的消费者调用的。 我通过记录线程名称来了解线程工作流程进行测试,每个消费者的接收线程名称不同,但在kafka发送[kafkaProducerTemplate.send]上,线程名称[线程名称:生产者-1]对于所有消费者都是相同的。我不明白这是如何工作的,我希望所有发送的消费者也会有所不同。有人可以帮助我理解这是如何工作的吗?

@Bean
    public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
        ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
                .addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
                .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
    }

@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
    return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}

@Bean
public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
        KafkaProperties properties) {
    Map<String, Object> props = properties.buildProducerProperties();
    return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
}


public void run(String... args) {

        for(int i = 0; i < topicPartitionsCount ; i++) {
            readWrite(destinationTopic).subscribe();
        }
    }}


public Flux<String> readWrite(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                .doOnNext(s-> sendToKafka(s,destTopic))
                .map(ConsumerRecord::value)               
                .onErrorContinue((exception,errorConsumer)->{
                    log.error("Error while consuming : {}", exception.getMessage());
                });
    }

public void sendToKafka(ConsumerRecord<String, String> consumerRecord, String destTopic){
   kafkaProducerTemplate.send(destTopic, consumerRecord.key(), transformRecord(consumerRecord))
                    .doOnNext(senderResult -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                    .doOnSuccess(senderResult -> {
                        log.debug("Sent {} offset : {}", metrics, senderResult.recordMetadata().offset());
                    }
                    .doOnError(exception -> {
                        log.error("Error while sending message to destination topic : {}", exception.getMessage());
                    })
                    .subscribe();
}

最佳答案

生产者的所有发送都在单线程调度程序上运行(通过 .publishOn())。

请参阅DefaultKafkaSender.doSend()

您应该为每个消费者创建一个发送者。

关于apache-kafka - Kafka 上的多线程 在 Spring Reactor Kafka 中发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69891782/

相关文章:

apache-kafka - 使用 Apache Storm 获取数据

docker - 如何部署其他Kafka Broker(Docker镜像)

hadoop - 文件传输到HDFS

spring - WebClient block 正在抛出 nullPointerException

spring-kafka - Kafka 消费者无法连接到 localhost :9092 using Spring Boot 2. 2.0.M4 以外的代理

spring-boot - 本地docker服务的Spring Boot Kafka连接问题

apache-kafka - vert.x事件总线可以代替Kafka的需要吗?

java - 如何在 Spring WebClient 中捕获 ConnectionException?

java - 如何捕获 react 流取消信号?

Spring Kafka - 如何使用 @KafkaListener 重试