java - Kafka 主题详细信息未在 Spark 中显示

标签 java apache-spark apache-kafka spark-streaming

我在 Kafka 中编写了一个主题作为 my-topic ,我正在尝试在 Spark 中获取主题的信息。但我在显示 Kafka 主题详细信息时遇到一些困难,因为我收到了一长串错误。我正在使用java来获取数据。

下面是我的代码:

public static void main(String s[]) throws InterruptedException{
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "Different id is allotted for different stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("my-topic");

    final JavaInputDStream<ConsumerRecord<String, String>> stream =
      KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
      );

    JavaPairDStream<String, String> jPairDStream =  stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });

    jPairDStream.foreachRDD(jPairRDD -> {
           jPairRDD.foreach(rdd -> {
                System.out.println("key="+rdd._1()+" value="+rdd._2());
            });
        });

    jssc.start();            
    jssc.awaitTermination(); 

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });
}

以下是我收到的错误:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/09/04 11:41:15 INFO SparkContext: Running Spark version 2.1.0 17/09/04 11:41:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/09/04 11:41:15 INFO SecurityManager: Changing view acls to: 11014525 17/09/04 11:41:15 INFO SecurityManager: Changing modify acls to: 11014525 17/09/04 11:41:15 INFO SecurityManager: Changing view acls groups to: 17/09/04 11:41:15 INFO SecurityManager: Changing modify acls groups to: 17/09/04 11:41:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(11014525); groups with view permissions: Set(); users with modify permissions: Set(11014525); groups with modify permissions: Set() 17/09/04 11:41:15 INFO Utils: Successfully started service 'sparkDriver' on port 56668. 17/09/04 11:41:15 INFO SparkEnv: Registering MapOutputTracker 17/09/04 11:41:15 INFO SparkEnv: Registering BlockManagerMaster 17/09/04 11:41:15 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 17/09/04 11:41:15 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/09/04 11:41:15 INFO DiskBlockManager: Created local directory at C:\Users\11014525\AppData\Local\Temp\blockmgr-cba489b9-2458-455a-8c03-4c4395a01d44 17/09/04 11:41:15 INFO MemoryStore: MemoryStore started with capacity 896.4 MB 17/09/04 11:41:16 INFO SparkEnv: Registering OutputCommitCoordinator 17/09/04 11:41:16 INFO Utils: Successfully started service 'SparkUI' on port 4040. 17/09/04 11:41:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.16.202.21:4040 17/09/04 11:41:16 INFO Executor: Starting executor ID driver on host localhost 17/09/04 11:41:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56689. 17/09/04 11:41:16 INFO NettyBlockTransferService: Server created on 172.16.202.21:56689 17/09/04 11:41:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 17/09/04 11:41:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.16.202.21, 56689, None) 17/09/04 11:41:16 INFO BlockManagerMasterEndpoint: Registering block manager 172.16.202.21:56689 with 896.4 MB RAM, BlockManagerId(driver, 172.16.202.21, 56689, None) 17/09/04 11:41:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.16.202.21, 56689, None) 17/09/04 11:41:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.16.202.21, 56689, None) 17/09/04 11:41:16 WARN KafkaUtils: overriding enable.auto.commit to false for executor 17/09/04 11:41:16 WARN KafkaUtils: overriding auto.offset.reset to none for executor 17/09/04 11:41:16 WARN KafkaUtils: overriding executor group.id to spark-executor-Different id is allotted for different stream 17/09/04 11:41:16 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Slide time = 10000 ms 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Storage level = Serialized 1x Replicated 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Checkpoint interval = null 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Remember interval = 10000 ms 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@23a3407b 17/09/04 11:41:16 INFO MappedDStream: Slide time = 10000 ms 17/09/04 11:41:16 INFO MappedDStream: Storage level = Serialized 1x Replicated 17/09/04 11:41:16 INFO MappedDStream: Checkpoint interval = null 17/09/04 11:41:16 INFO MappedDStream: Remember interval = 10000 ms 17/09/04 11:41:16 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@140030a9 17/09/04 11:41:16 INFO ForEachDStream: Slide time = 10000 ms 17/09/04 11:41:16 INFO ForEachDStream: Storage level = Serialized 1x Replicated 17/09/04 11:41:16 INFO ForEachDStream: Checkpoint interval = null 17/09/04 11:41:16 INFO ForEachDStream: Remember interval = 10000 ms 17/09/04 11:41:16 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@65041548 17/09/04 11:41:16 ERROR StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value. at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:194) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16 INFO ReceiverTracker: ReceiverTracker stopped 17/09/04 11:41:16 INFO JobGenerator: Stopping JobGenerator immediately 17/09/04 11:41:16 INFO RecurringTimer: Stopped timer for JobGenerator after time -1 17/09/04 11:41:16 INFO JobGenerator: Stopped JobGenerator 17/09/04 11:41:16 INFO JobScheduler: Stopped JobScheduler Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value. at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:194) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16 INFO SparkContext: Invoking stop() from shutdown hook 17/09/04 11:41:16 INFO SparkUI: Stopped Spark web UI at http://172.16.202.21:4040 17/09/04 11:41:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 17/09/04 11:41:16 INFO MemoryStore: MemoryStore cleared 17/09/04 11:41:16 INFO BlockManager: BlockManager stopped 17/09/04 11:41:16 INFO BlockManagerMaster: BlockManagerMaster stopped 17/09/04 11:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 17/09/04 11:41:16 INFO SparkContext: Successfully stopped SparkContext 17/09/04 11:41:16 INFO ShutdownHookManager: Shutdown hook called 17/09/04 11:41:16 INFO ShutdownHookManager: Deleting directory C:\Users\11014525\AppData\Local\Temp\spark-37334cdc-9680-4801-8e50-ef3024ed1d8a

pom.xml

org.apache.spark spark-streaming_2.11 2.1.0 commons-lang commons-lang 2.6 org.apache.kafka kafka_2.10 0.8.2.0 org.apache.spark spark-streaming-kafka-0-10_2.10 2.1.1

最佳答案

从日志中可以看出,您的spark版本是2.1.0。您尚未共享具有其他依赖项的构建文件。看起来您同时拥有 spark-streaming-kafka-0-8_2.11-2.1.0.jarspark-streaming-kafka-0-10_2.11-2.1.0。 jar 在类路径中,并且加载了错误的类。如果您使用 Maven,那么您将需要如下所示的依赖项。请检查并更新您的项目。

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
</dependency>  
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.0</version>
</dependency> 

编辑

当您编辑问题并发布依赖项时,我正在编辑我的答案。您使用的是 Kafka 版本 0.8.*,而您的 Spark-streaming-kafka 版本是 0.10.*。请对 Kafka 依赖项使用相同版本。请使用以下 org.apache.kafka

依赖项
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.0</version>
</dependency>

关于java - Kafka 主题详细信息未在 Spark 中显示,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46031254/

相关文章:

java - php json_encode 安卓的欧元符号

java - 使用 libgdx 时字符串固定在屏幕上

java - 获取 ListView 中所选项目的 id

scala - 在 spark shell 中运行脚本时避免打印代码

docker - Spring Boot 和 Kafka,生产者抛出异常, key 为 ='null'

java - 有没有办法判断 HTML 十六进制颜色是浅色还是深色

java - 使用 Mongo-Hadoop 连接器通过 Apache Spark 更新 MongoDb 中的集合

hadoop - 加入Spark太慢了。有什么办法可以优化吗?

testing - 一个测试Kafka集群性能的框架

apache-kafka - Kafka 在尝试访问磁盘上的日志文件时抛出异常磁盘错误