java - 如何用java编写Kafka Consumer Client来消费来自多个broker的消息?

标签 java apache-kafka kafka-consumer-api partitioner

我正在寻找java客户端(Kafka Consumer)来消费来自多个代理的消息。请指教

下面是使用简单分区器将消息发布到多个代理所编写的代码。

主题是使用复制因子“2”和分区“3”创建的。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
{
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    logger.info("Number of Partitions " + numPartitions);
    if (keyBytes == null) 
    {
        int nextValue = counter.getAndIncrement();
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) 
        {
            int part = toPositive(nextValue) % availablePartitions.size();
            int selectedPartition = availablePartitions.get(part).partition();
            logger.info("Selected partition is " + selectedPartition);
            return selectedPartition;
        } 
        else 
        {
            // no partitions are available, give a non-available partition
            return toPositive(nextValue) % numPartitions;
        }
    } 
    else 
    {
        // hash the keyBytes to choose a partition
        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

}


public void publishMessage(String message , String topic)
{
    Producer<String, String> producer = null;
    try
    {
     producer = new KafkaProducer<>(producerConfigs());
     logger.info("Topic to publish the message --" + this.topic);
     for(int i =0 ; i < 10 ; i++)
     {
     producer.send(new ProducerRecord<String, String>(this.topic, message));
     logger.info("Message Published Successfully");
     }
    }
    catch(Exception e)
    {
        logger.error("Exception Occured " + e.getMessage()) ;
    }
    finally
    {
     producer.close();
    }
}

public Map<String, Object> producerConfigs() 
{
    loadPropertyFile();
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    propsMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    propsMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    propsMap.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SimplePartitioner.class);
    propsMap.put(ProducerConfig.ACKS_CONFIG, "1");
    return propsMap;
}

public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    System.out.println("properties.getBootstrap()"  + properties.getBootstrap());
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrap());
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getAutocommit());
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getTimeout());
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupid());
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutooffset());
    return propsMap;
}

@KafkaListener(id = "ID1", topics = "${config.topic}", group = "${config.groupid}")
public void listen(ConsumerRecord<?, ?> record) 
{
    logger.info("Message Consumed " + record);
    logger.info("Partition From which Record is Received " + record.partition());
    this.message = record.value().toString();   
}

bootstrap.servers = [本地主机:9092,本地主机:9093,本地主机:9094]

最佳答案

如果您使用常规 Java 消费者,它将自动从多个代理读取。您无需编写特殊代码。只需订阅您想要消费的主题,消费者就会自动连接到相应的代理。您只需提供一个“单一入口点”代理——客户端会自动找出集群中的所有其他代理。

关于java - 如何用java编写Kafka Consumer Client来消费来自多个broker的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43069647/

相关文章:

java - J2SE 代理认证

java - 实现MVP的最佳方法

java - 断言等于将字符串和数组列表与 Excel 中的文本数据进行比较

java - Kafka - 如何在 Producer 类中获取失败的消息详细信息

python - 卡夫卡消费者 : How to start consuming from the last message in Python

apache-kafka - 如何在消费消息之前在 Kafka 中对主题的消费者进行身份验证/授权

java - JAVA 中字符串转公钥

apache-kafka - 即使Apache Kafka配置文件已经有了,为什么我们还需要提及Zookeeper的细节?

java - Spring for Kafka 2.3 在运行时设置偏移量(如果消费者存在),否则创建新消费者

apache-kafka - 卡夫卡生产者 : How to fairly balance the messages between kafka consumers (not between partitions)