java - 在 kafka.apache.org 上运行示例时,Kafka 消费者未收到消息

标签 java apache-kafka kafka-consumer-api

我对 Kafka 很陌生,并尝试在 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example 上运行消费者示例,但它没有收到任何消息。

这是 Eclipse 控制台中的输出:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Shutting down Thread: 2
Shutting down Thread: 0
Shutting down Thread: 1

下面是我的消费者代码

public class ConsumerDemo {
private final ConsumerConnector consumer; //why private final
private final String topic;
private ExecutorService executor;

public ConsumerDemo(String a_zookeeper,String a_groupId,String a_topic)
{
    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
    this.topic=a_topic;
}

public void shutdown()
{
    if(consumer != null)
        consumer.shutdown();
    if(executor != null)
        executor.shutdown();
}

public void run(int numThreads)
{
    Map<String,Integer> topicCountMap= new HashMap<String,Integer>();
    topicCountMap.put(topic, new Integer(numThreads));
    Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(topic);
    int m=streams.size();

    executor = Executors.newFixedThreadPool(numThreads);

    int threadNumber=0;
    for(final KafkaStream stream : streams)
    {
        executor.submit(new ConsumerMsgTask(stream,threadNumber));
        threadNumber++;
    }
}

private static ConsumerConfig createConsumerConfig(String a_zookeeper,String a_groupId)
{
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "4000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);
}

public static void main(String[] arg)
{
    String[] args = {"192.168.0.123:2181","group-a","test1","3"};
    String zooKeeper = args[0];
    String groupId = args[1];
    String topic = args[2];
    int threads = Integer.parseInt(args[3]);

    ConsumerDemo demo = new ConsumerDemo(zooKeeper,groupId,topic);
    demo.run(threads);

    try
    {
        Thread.sleep(10000);
    }catch (InterruptedException ie)
    {

    }
    demo.shutdown();

}

这是 ConsumerMsgTask

    public class ConsumerMsgTask implements Runnable {
private KafkaStream<byte[], byte[]> m_stream;
private int m_threadNumber;

public ConsumerMsgTask(KafkaStream<byte[], byte[]> stream,int threadNumber)
{
    m_threadNumber = threadNumber;
    m_stream = stream;
}

public void run()
{
    ConsumerIterator<byte[],byte[]> it = m_stream.iterator();
    int i=it.size();
    while(it.hasNext())
        System.out.println("Thread "+m_threadNumber+": "+ new String(it.next().message()));
     System.out.println("Shutting down Thread: " + m_threadNumber);
}

这是我的 ProducerDemo

    public class ProducerDemo {
public static void main(String[] args)
{
    Random rnd= new Random();
    int events=100;

    Properties props= new Properties();
    props.put("metadata.broker.list", "192.168.0.123:9092,192.168.0.123:9093,192.168.0.123:9094");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");

    ProducerConfig config = new ProducerConfig(props);

    Producer<String,String> producer=new Producer<String,String>(config);
    long start=System.currentTimeMillis();
    for(int i=0;i<events;i++)
    {
        long runtime=new Date().getTime();
        String ip="192.168.5."+rnd.nextInt(255);
        String msgs=runtime+",www.maodou.com,"+ip;
        KeyedMessage<String,String> data=new KeyedMessage<String,String>("test1",ip,msgs);
        producer.send(data);
    }
    System.out.println("time:"+(System.currentTimeMillis()-start));
    producer.close();

}

}

我通过以下命令创建了主题“test1”

$ bin/kafka-topics.sh --create --zookeeper 192.168.0.123:2181 --replication-factor 3 --partitions 3 --topic test1

这是使用在 CentOS 版本 6.5(最终版)上运行的 Kafka 0.8.2, 使用 OpenJDK“1.7.0_45”。

最佳答案

您的 Main() 中是否有以下代码?如果是这样,删除它就可以解决问题。 当主线程尝试在 10 秒后关闭消费者时,我们遇到了同样的问题。您可能必须使用 Apache Commons Cli 和 Apache Commons Daemon 来实现正常关闭/启动。

try {
    Thread.sleep(10000);
} catch (InterruptedException ie) {

}
example.shutdown();

关于java - 在 kafka.apache.org 上运行示例时,Kafka 消费者未收到消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28981639/

相关文章:

java - 在没有注释的情况下强制对 Spring MVC Controller 的 JSON 主体中的字符串值进行全局验证

java - 在 RCP headless 模式下打开 Eclipse View?

java - 用于添加全局存储的 Kafka 流用例

apache-kafka - Confluence Kafka Connect HDFS Sink 连接器延迟

kubernetes - 在本地 Kubernetes 集群中使用 Kafka Exporter 的 HPA

java - 如何用这个正则表达式拆分字符串?

java - 如何在 Spring Boot 应用程序中运行 Flyway 命令?

apache-kafka - Kafka-对数结束偏移(LEO)与高水印(HW)之间的差异

apache-kafka - Kafka Connect 能否保证 RetriableException 发生时的写入顺序?

apache-spark - 是否可以在 Kafka+Spark Streaming 中获取特定的消息偏移量?