java - RDD 不可序列化 Cassandra/Spark 连接器 java API

标签 java cassandra apache-spark connector datastax

所以我之前有一些关于如何在 java maven 项目中使用 spark 查询 cassandra 的问题:Querying Data in Cassandra via Spark in a Java Maven Project

好吧,我的问题得到了回答并且有效,但是我遇到了一个问题(可能是一个问题)。我现在正在尝试使用 datastax java API。这是我的代码:

package com.angel.testspark.test2;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
               }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

这是我的错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

现在我确切地知道我的错误在哪里。它是 System.out.println("Data as Person beans:\n"+ StringUtils.join("\n", rdd.toArray())); 因为我需要将 rdd 转换为大批。但是,API 文档说我应该能够做到这一点……这是从文档中复制和粘贴的代码。为什么我不能将 RDD 序列化为数组?

我已经使用我在上面链接中包含的帖子中的插入内容将虚拟数据插入到我的 cassandra 中。

此外,我之前解决的一个错误是将所有 getter 和 setter 更改为小写。当我在其中使用大写字母时,会产生错误。为什么我不能在此处的 getter 和 setter 中使用大写字母?

谢谢, 天使

最佳答案

public class App 更改为 public class App implements Serializable 应该可以修复错误。因为 java 内部类将保留对外部类的引用,所以您的 Function 对象将具有对 App 的引用。由于 Spark 需要序列化您的 Function 对象,因此它要求 App 也是可序列化的。

关于java - RDD 不可序列化 Cassandra/Spark 连接器 java API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25982263/

相关文章:

java - 当我将一个列表子列出到多个新列表中时,我丢失了项目。如何在不丢失数据的情况下子列表?

java - Gen-Class 不生成 .class 文件

java - 如何缓存 InputStream 以供多次使用

mongodb - 如何使用 NoSQL (Cassandra) 实现客户端身份验证解决方案?

performance - DSE- Cassandra : Commit Log Disk Impact on Performances

hadoop - 我正在CDH5.4上使用Hbase 1.0.0和Apache phoenix 4.3.0。当我重新启动Hbase regionserver时关闭

apache-spark - Spark在yarn模式下增加执行器数量

java - 为什么之前的sql语句又被执行了?

cassandra - 在 Titan-1.1.0-SNAPSHOT 中增加 thrift 帧大小的正确配置是什么

scala - 在 Spark 数据集上使用 groupByKey 中的最小/最大操作