java - 在 Apache Kafka 中设置多个分区

标签 java apache-kafka apache-zookeeper producer-consumer

我试图从代码中将分区数量设置为 2,并且我有单节点设置(1 个 Zookeeper,1 个 kafka)。当我使用该消息时,我看到 kafka 仅使用一个分区来存储数据,我是否需要对设置进行任何修改才能拥有多个分区?

 private void setupZookeeper(String[] topicList){

    ZkClient zkClient = null;
    ZkUtils zkUtils = null;
    try {
        String[] zookeeperHosts = {"localhost:2181"}; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
        int sessionTimeOutInMs = 15 * 1000; // 15 secs
        int connectionTimeOutInMs = 10 * 1000; // 10 secs
        //String topicName = "testTopic";
        int noOfPartitions = 2;
        int noOfReplication = 1;

        for(String zookeeper:zookeeperHosts){

            zkClient = new ZkClient(zookeeper, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false);
            for(String topicName: topicList){
                System.out.println("Setting no of partitions ="+noOfPartitions + "for topic" + topicName);
                AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, 
                         producerConfig(),RackAwareMode.Disabled$.MODULE$);
            }
        }



    } catch (Exception ex) {
        ex.printStackTrace();
    } finally {
        if (zkClient != null) {
            zkClient.close();
        }
    }

我的 ProducerConfig 如下所示:

private Properties producerConfig() {
   Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");

  props.put("acks", "all");
  //props.put("retries", 0);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

return props;
}

最佳答案

when i consume the message i see that kafka is using only one partition to store the data

默认的消息分区策略如下,“只使用一个分区”可能是由于消息key不变、计算的哈希值相同、只路由到一个分区造成的。

  • 如果记录中指定了分区,则使用它;
  • 如果未指定分区但存在 key ,则根据 key 的哈希值选择分区;
  • 如果不存在分区或键,则以循环方式选择分区。

关于java - 在 Apache Kafka 中设置多个分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38601184/

相关文章:

java - 使用 DefaultTableModel 插入后 JTable 不刷新

java - 我可以根据给定的分区 ID 和偏移量列表使用 kafka 消息吗?

java - Spring Kafka 以 TCP 端口作为生产者

python - 我可以递归地在 Zookeeper 中创建路径吗?

Android 设备上的 Javafxports 和 SQLite

java - 内部类中的泛型

Java:从具有缓冲输入的随机访问文件中读取字符串

java - 使用 Kafka Streams 进行 OpenTracing - 如何?

与 zookeeper 相关的 hbase 错误

apache-zookeeper - 使用solrCloud爬取时出现SessionException