java - Kafka Spark Streaming Consumer不会收到来自Kafka Console Producer的任何消息?

标签 java apache-spark apache-kafka spark-streaming

我正在尝试集成 Spark 和 Kafka 来消费来自 Kafka 的消息。我还有生产者代码来发送有关“临时”主题的消息。另外,我正在使用 Kafka 的 Console Producer 来生成有关“temp”主题的消息。

我创建了下面的代码来使用来自同一“temp”主题的消息,但它也不会接收单个消息。

程序:

import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import static org.apache.commons.lang3.StringUtils.SPACE;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.immutable.ListSet;
import scala.collection.immutable.Set;

public class ConsumerDemo {

    public void main() {
        String zkGroup = "localhost:2181";
        String group = "test";
        String[] topics = {"temp"};
        int numThreads = 1;

        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[4]").set("spark.ui.port‌​", "7077").set("spark.executor.memory", "1g");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        Map<String, Integer> topicMap = new HashMap<>();
        for (String topic : topics) {
            topicMap.put(topic, numThreads);
        }
        System.out.println("topics : " + Arrays.toString(topics));
        JavaPairReceiverInputDStream<String, String> messages
                = KafkaUtils.createStream(jssc, zkGroup, group, topicMap);

        messages.print();

        JavaDStream<String> lines = messages.map(Tuple2::_2);

        //lines.print();
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
                .reduceByKey((i1, i2) -> i1 + i2);

        //wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }

    public static void main(String[] args) {
        System.out.println("Started...");
        new ConsumerDemo().main();
        System.out.println("Ended...");
    }
}

我在 pom.xml 文件中添加了以下依赖项:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>0.9.0-incubating</version>
        <type>jar</type>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3</version>
        <type>jar</type>
    </dependency>

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>

    <dependency>
        <groupId>org.anarres.lzo</groupId>
        <artifactId>lzo-core</artifactId>
        <version>1.0.5</version>
        <type>jar</type>
    </dependency>

    <dependency> 
        <groupId>com.fasterxml.jackson.core</groupId> 
        <artifactId>jackson-databind</artifactId> 
        <version>2.8.2</version> 
    </dependency> 
    <dependency> 
        <groupId>com.fasterxml.jackson.module</groupId> 
        <artifactId>jackson-module-scala_2.10</artifactId> 
        <version>2.8.2</version> 
    </dependency>
    <dependency>
        <groupId>com.msiops.footing</groupId>
        <artifactId>footing-tuple</artifactId>
        <version>0.2</version>
    </dependency>

我是否缺少某些依赖项或代码中存在问题?为什么这段代码收不到任何消息?

最佳答案

您没有调用具有连接和使用来自 Kafka 的消息的代码的方法。要么在 public static void main() 中编写该逻辑,要么调用您编写此逻辑的方法。

关于java - Kafka Spark Streaming Consumer不会收到来自Kafka Console Producer的任何消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45982651/

相关文章:

java - 如何通过 Maven 在 POM 中设置 TestNG 选项 "configfailurepolicy"?

python - 如何在 Spark 的 map 函数中使用数据框?

apache-spark - java.lang.AbstractMethodError,org.apache.spark.internal.Logging$class.initializeLogIfNecessary

migration - Kafka数据从一台服务器拷贝到另一台

java - Confluence Cloud Apache Kafka Consumer - 主题 [test-1] 不存在且 MissingTopicsFatal 为 true

java - 对于与输入相同的请求正文,签名和摘要值不匹配 Java 与 SOAP UI

java - 数据库中Spring中的写锁实体

java HTML正则表达式问题

apache-spark - docker-compose v3 + apache spark,端口 7077 上的连接被拒绝

apache-spark - Spark Streaming 应用程序中不同持续时间的多个窗口