java - Spark Streaming 中的 Kafka 消费者

标签 java apache-spark apache-zookeeper apache-kafka spark-streaming

尝试编写一个使用来自 Kafka 的消息的 Spark Streaming 作业。这是我到目前为止所做的:

  1. 启动动物园管理员
  2. 启动 Kafka 服务器
  3. 向服务器发送了一些消息。我可以在运行以下命令时看到它们:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
    
  4. 现在尝试编写一个程序来计算 5 分钟内收到的消息数量。

代码看起来像这样:

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
        sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

不确定第三个参数(消费者组)使用什么值。当我运行它时,我得到 Unable to connect to zookeeper server。但是 Zookeeper 运行在 2181 端口上;否则第 3 步将无法工作。

似乎我没有正确使用 KafkaUtils.createStream。有什么想法吗?

最佳答案

没有默认消费者组这样的东西。您可以在那里使用任意非空字符串。如果您只有一个消费者,那么它的消费者群体并不重要。如果有两个或两个以上的消费者,他们可以是同一个消费者组的一部分,也可以属于不同的消费者组。

来自 http://kafka.apache.org/documentation.html :

Consumers

...

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

我认为问题可能出在“主题”参数中。 来自 Spark docs :

Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

您只为主题指定了一个分区,即“1”。根据代理的设置 (num.partitions),可能有多个分区,您的消息可能会发送到您的程序无法读取的其他分区。

此外,我相信 partitionIds 是基于 0 的。因此,如果您只有一个分区,它的 ID 将等于 0。

关于java - Spark Streaming 中的 Kafka 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26725463/

相关文章:

java - 使用 Java 将文件写入 HDFS

java - Zookeeper cfg 文件 - 为什么有多个端口?

java - 每当有任何新行或任何新更新时从 Cassandra 数据库中提取?

Java 13 - 无法使用 `var` 关键字

java - 检索位序列并将其用于查询

Java 泛型类强制转换异常

apache-zookeeper - Zookeeper 连接超时问题

java - 插入前检查注册数据? JAVA

apache-spark - PYSPARK_PYTHON 适用于 --deploy-mode 客户端但不适用于 --deploy-mode 集群

java - 在 Windows 上的 Spark-Submit 中将多个 -D 参数传递给 driver-java-options