apache-spark - Spark RDD 映射 1 到多个

标签 apache-spark cassandra rdd spark-cassandra-connector

我是 Spark 新手,有一个问题。我正在处理使用 textFile() 生成的 RDD,它是一个 csv 文件。对于每一行,我想将多行返回到一个新的 RDD(单个而不是多个)。这是我的代码:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
            new Function<String, Boolean>() {
                public Boolean call(String line) {
                    return line.contains("LinearAccelerationEvent");
                }
            }).map(
            new Function<String, LinearAccelerationEvent>() {
                public LinearAccelerationEvent call(String line) throws Exception {
                    String[] fields = line.split(",");
                    LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                    return linearAccelerationEvent;
                }
            }).cache();

我在这里所做的是过滤初始csv以仅获取LinearAccelerationEvent,然后我想将这些对象映射到LinearAccelerationEvent类并生成LinearAccelerationEvent对象的新RDD。对于初始 csv 文件的每一行,我必须生成多个 LinearAccelerometerEvent 对象,但我不知道该怎么做。我之所以要这样做是因为稍后这个RDD将被推送到cassandra,如下所示:

javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();

所以理想的解决方案是这样的:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
                new Function<String, Boolean>() {
                    public Boolean call(String line) {
                        return line.contains("LinearAccelerationEvent");
                    }
                }).map(
                new Function<String, LinearAccelerationEvent>() {
                    public LinearAccelerationEvent call(String line) throws Exception {
                        String[] fields = line.split(",");
                        for() {
                           LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                           return linearAccelerationEvent;
                        }
                }
            }).cache();

我可以使用 foreachPartition() 函数并将 for 循环的每个事件推送到 Cassandra,但我发现这种方法要慢得多。是否可以不使用 foreach 来做我想做的事情?谢谢

最佳答案

如果我理解正确,请返回LinearAccelerationEvent的集合(例如List)并调用flatMap而不是map。这将为每个加速事件在生成的 RDD 中生成一个值。

flatMap 与先调用 map 再调用 flatten 的效果相同。如果您熟悉 Hive,这与使用 HiveQL 中提供的爆炸 DTF 类似。

关于apache-spark - Spark RDD 映射 1 到多个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33981301/

相关文章:

Scala Spark - 任务不可序列化

python - 为什么 zip 会截断 pyspark 中的数据?

docker - 无法从主机连接到 cassandra

apache-spark - 使用 Spark SQL 的 SELECT 语句中的 cassandra uuid

scala - 在 Spark API 中,makeRDD 函数和并行化函数有什么区别?

java - 创建 JavaSparkContext 时出现 NoClassDefFoundError

scala - 如何在 Spark 中创建一个空的数据帧

Cassandra - 重叠数据范围

scala - Spark 缓存 : RDD Only 8% cached

apache-spark - 如何在多列上编写 Pyspark UDAF?