尝试编写一个使用来自 Kafka 的消息的 Spark Streaming 作业。这是我到目前为止所做的:
- 启动动物园管理员
- 启动 Kafka 服务器
向服务器发送了一些消息。我可以在运行以下命令时看到它们:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
现在尝试编写一个程序来计算 5 分钟内收到的消息数量。
代码看起来像这样:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
不确定第三个参数(消费者组)使用什么值。当我运行它时,我得到 Unable to connect to zookeeper server
。但是 Zookeeper 运行在 2181
端口上;否则第 3 步将无法工作。
似乎我没有正确使用 KafkaUtils.createStream
。有什么想法吗?
最佳答案
没有默认消费者组这样的东西。您可以在那里使用任意非空字符串。如果您只有一个消费者,那么它的消费者群体并不重要。如果有两个或两个以上的消费者,他们可以是同一个消费者组的一部分,也可以属于不同的消费者组。
来自 http://kafka.apache.org/documentation.html :
Consumers
...
If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.
If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.
我认为问题可能出在“主题”参数中。 来自 Spark docs :
Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
您只为主题指定了一个分区,即“1”。根据代理的设置 (num.partitions),可能有多个分区,您的消息可能会发送到您的程序无法读取的其他分区。
此外,我相信 partitionIds 是基于 0 的。因此,如果您只有一个分区,它的 ID 将等于 0。
关于java - Spark Streaming 中的 Kafka 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26725463/