java - 在 Spark 中过滤自定义数据结构

标签 java apache-spark hdfs rdd distributed-computing

我正在尝试将 csv 文件读入 JavaRDD。为此,我编写了以下代码:

SparkConf conf = new SparkConf().setAppName("NameOfApp").setMaster("spark://Ip here:7077");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<CurrencyPair> rdd_records = sc.textFile(System.getProperty("user.dir") + "/data/data.csv", 2).map(
        new Function<String, CurrencyPair>() {
            public CurrencyPair call(String line) throws Exception {
                String[] fields = line.split(",");
                CurrencyPair sd = new CurrencyPair(Integer.parseInt(fields[0].trim()), Double.parseDouble(fields[1].trim()),
                        Double.parseDouble(fields[2].trim()), Double.parseDouble(fields[3]), new Date(fields[4]));
                return sd;
            }
        }
);

我的数据文件如下所示:

1,0.034968,212285,7457.23,"2019-03-08 18:36:18"

在这里,为了检查我的数据是否正确加载,我尝试打印其中的一些数据:

System.out.println("Count: " + rdd_records.count());
List<CurrencyPair> list = rdd_records.top(5);
System.out.println(list.toString());

但是我在两个系统输出行都出现了以下错误。我也单独尝试了它们中的每一个,而不是同时打印计数和列表。

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

我的自定义对象如下所示:

public class CurrencyPair implements Serializable {

private int id;
private double value;
private double baseVolume;
private double quoteVolume;
private Date timeStamp;

public CurrencyPair(int id, double value, double baseVolume, double quoteVolume, Date timeStamp) {
    this.id = id;
    this.value = value;
    this.baseVolume = baseVolume;
    this.quoteVolume = quoteVolume;
    this.timeStamp = timeStamp;
}

public int getId() {
    return id;
}

public void setId(int id) {
    this.id = id;
}

public double getValue() {
    return value;
}

public void setValue(double value) {
    this.value = value;
}

public double getBaseVolume() {
    return baseVolume;
}

public void setBaseVolume(double baseVolume) {
    this.baseVolume = baseVolume;
}

public double getQuoteVolume() {
    return quoteVolume;
}

public void setQuoteVolume(double quoteVolume) {
    this.quoteVolume = quoteVolume;
}

public Date getTimeStamp() {
    return timeStamp;
}

public void setTimeStamp(Date timeStamp) {
    this.timeStamp = timeStamp;
}
}

所以我无法弄清楚这里出了什么问题。我做错了什么?

<小时/>

编辑:当我写本地而不是我自己的spark master IP时,效果很好。但我需要在我自己的 IP 上运行它。那么我的主节点可能出了什么问题?

最佳答案

问题可能是匿名类定义 new Function<String, CurrencyPair>() {这迫使 Spark 也尝试序列化父类。尝试使用 lambda 代替:

rdd_records.map(
  (Function<String, CurrencyPair>) line -> {
    ...

注意:您可以将文件作为 CSV 读取,并使用数据集 API 和 Bean 编码器来完全跳过手动解析。

关于java - 在 Spark 中过滤自定义数据结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56082952/

相关文章:

apache-spark - 在分区内的多列上进行 Spark 聚合,无需洗牌

hadoop - 在 hadoop 的同一分区内合并多个文件的最佳选择?

hadoop - 我可以使用集群复制在两个不同的集群之间进行 Hbase 迁移吗?

java - 使用 Java 通过 http 上传文件时获取上传速度百分比进度?

java - 带有多个 jar 的 Unity3D(android jar + 纯 java lib)

java - 使用 Java 在 XMPP 服务器中创建新用户

linux - 超过命名空间配额时如何清空hadoop上的垃圾?

JAVA not in gzip format错误

python - pyspark 在分组的 applyInPandas 中添加多列(更改架构)

跨集群记录