java - 为什么消费者在使用 Java 客户端 API 在 DC/OS 上使用来自 Kafka 的消息时挂起?

标签 java apache-kafka dcos

我在 AWS 上的 DC/OS (Mesos) 集群上安装了 Kafka。启用三个代理并创建一个名为“topic1”的主题。

dcos kafka topic create topic1 --partitions 3 --replication 3

然后我编写了一个 Producer 类来发送消息和一个 Consumer 类来接收它们。

public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }

}

public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }

    public static void main(String[] args) {
        getMessage();
    }
}

首先,我在集群上运行 Producer 以将消息发送到 topic1。但是,当我运行 Consumer 时,它什么也收不到,只是挂起。

Producer 正在工作,因为我能够通过运行 Kafka 安装附带的 shell 脚本来获取所有消息

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning

但是为什么我不能用 Consumer 接收?这post建议具有旧偏移量的 group.id 可能是一个可能的原因。我只在消费者而不是生产者中创建 group.id。如何配置该组的偏移量?

最佳答案

事实证明,kafkaConsumer.subscribe(Arrays.asList("topic1")); 导致 poll() 挂起。根据Kafka Consumer does not receive messages ,有两种方法可以连接到主题,assignsubscribe。在我用下面的行替换 subscribe 之后,它开始工作了。

    TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);

但是输出显示的是非预期的数字数组(生产者发送的字符串)。但我想这是一个单独的问题。

关于java - 为什么消费者在使用 Java 客户端 API 在 DC/OS 上使用来自 Kafka 的消息时挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42496443/

相关文章:

java - Aerospike数据库设计

java - 如何在没有连接器的情况下创建正确的 kafka-connect 插件?

elasticsearch - 是否有适用于 DC/OS、ElasticSearch、Kafka Connect 和 Kafka Streams 的 CloudFormation 模板?

python - dcos 客户端安装失败-导入并发.futures ImportError : No module named concurrent. futures

azure - Azure 容器服务上的 Marathon - 无法扩展到所有节点

java - JDBC 连接无法从 apache Spark 连接 Teradata

java - SwitchPreference onChecked/onClick 监听器

java - 使用 Java 流将平面列表转换为具有子对象的域对象

database - Kafka 到 ES 和其他 sink DB

apache-kafka - Kafka10.1 heartbeat.interval.ms,session.timeout.ms和max.poll.interval.ms