java - 卡夫卡 -> Spark流 -> Hbase。任务不可序列化错误由 : java. lang.IllegalStateException 引起:作业处于 DEFINE 状态而不是 RUNNING 状态

标签 java hbase apache-kafka spark-streaming

我正在尝试编写由 Kafka 命令行生产者为某个主题生成的数据。

我遇到问题,无法继续。下面是我的代码,我将其创建为 jar 并通过 Spark-shell 上的 Spark-Submit 运行。

我在 foreachRDD() 中做错了吗?下面的错误消息中的 SparkKafkaDemo$2.call(SparkKafkaDemo.java:63) 行有什么问题?

SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaDemo").setMaster("local").setSparkHome("/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4");

            JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Duration.seconds(1));

            int numThreads = 2;
            Map<String, Integer> topicMap = new HashMap<String, Integer>();

            topicMap.put("nonview", numThreads);

            JavaPairReceiverInputDStream<String, String> messages =
                    KafkaUtils.createStream(jsc, "localhost", "ViewConsumer", topicMap);

            JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
                @Override
                public String call(Tuple2<String, String> tuple2) {
                    return tuple2._2();
                }
            });

            lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
                                 @Override
                                 public Void call(JavaRDD<String> stringJavaRDD) throws Exception {
                                     JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = stringJavaRDD.mapToPair(
                                             new PairFunction<String, ImmutableBytesWritable, Put>() {
                                                 @Override
                                                 public Tuple2<ImmutableBytesWritable, Put> call(String line) throws Exception {

                                                     Put put = new Put(Bytes.toBytes("Rowkey" + Math.random()));
                                                     put.addColumn(Bytes.toBytes("firstFamily"), Bytes.toBytes("firstColumn"), Bytes.toBytes(line+"fc"));
                                                     return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
                                                 }
                                             });

                                     // save to HBase- Spark built-in API method
                                     hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
                                     return null;
                                 }
                             }
            );
            jsc.start();
            jsc.awaitTermination();

错误:

./bin/spark-submit --class "SparkKafkaDemo" --master local /Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)

at org.apache.spark.rdd.RDD.map(RDD.scala:286)

at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)

at org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)

at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)

at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60)

at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)

at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)

at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)

at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)

at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)

at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

at scala.util.Try$.apply(Try.scala:161)

at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)

at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)

at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)

at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING

at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283)

at org.apache.hadoop.mapreduce.Job.toString(Job.java:452)

at java.lang.String.valueOf(String.java:2847)

at java.lang.StringBuilder.append(StringBuilder.java:128)

at scala.StringContext.standardInterpolator(StringContext.scala:122)

at scala.StringContext.s(StringContext.scala:90)

at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:103)

at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)

at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)

at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)

at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)

at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)

at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)

at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

... 24 more

最佳答案

请添加序列化

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

关于java - 卡夫卡 -> Spark流 -> Hbase。任务不可序列化错误由 : java. lang.IllegalStateException 引起:作业处于 DEFINE 状态而不是 RUNNING 状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30815230/

相关文章:

java - GPS项目的Android应用程序

java - HTTP 隧道 Servlet (Java)

java - 从抽象中看到实现细节

java - 如何在不同的 web 应用程序中使用 EventBus 共享事件?

hadoop - 映射从hbase表中减少特定区域的数据

HBase始终启动Zookeeper服务器

java - 如何启动 OpenTSDB?

apache-kafka - 如何在KTable中动态查找?

java - 有没有办法直接从处理器内将数据发送到 Kafka 主题?

apache-kafka - 卡夫卡消费者 : Want to read same message again if not committed previous messages offset and auto commit is disabled