java - 如何在 Java 中结合使用 Spark RDD 和 PairRDD

标签 java apache-spark

我有一个包含列 userId(String)、itemId(int) 和 rating(int) 的数据集。

+----------+----------+---------+
| userId   |  itemId  |  rating |
+----------+----------+---------+
|  abc13   |    23    |    1    |
+----------+----------+---------+
|  qwe34   |    56    |    3    |
+----------+----------+---------+
|  qwe34   |    35    |    4    |
+----------+----------+---------+

我想将字符串 userIds 映射到唯一的长值。我尝试使用 zipWithUniqueId() 映射 userId,它给出了一个 pairRDD

+------------+----------------+
|   userId   |  userIdMapped  |
+------------+----------------+
|    abc13   |        0       |   
+------------+----------------+
|    qwe34   |        1       |   
+------------+----------------+

我想将 long 值添加到另一列并创建如下数据集:

+----------+----------+---------+----------------+
| userId   |  itemId  |  rating |  userIdMapped  |
+----------+----------+---------+----------------+
|  abc13   |    23    |    1    |       0        |
+----------+----------+---------+----------------+
|  qwe34   |    56    |    3    |       1        |
+----------+----------+---------+----------------+
|  qwe34   |    35    |    4    |       1        |
+----------+----------+---------+----------------+

我尝试了以下方法:

JavaRDD<Feedback> feedbackRDD = spark.read().jdbc(MYSQL_CONNECTION_URL, feedbackQuery, connectionProperties)
            .javaRDD().map(Feedback.mapFunc);
JavaPairRDD<String, Long> mappedPairRDD = feedbackRDD.map(new Function<Feedback, String>() {
    public String call(Feedback p) throws Exception {
        return p.getUserId();
    }).distinct().zipWithUniqueId();
Dataset<Row> feedbackDS = spark.createDataFrame(feedbackRDD, Feedback.class);
Dataset<String> stringIds = spark.createDataset(zipped.keys().collect(), Encoders.STRING());
Dataset<Long> valueIds = spark.createDataset(zipped.values().collect(), Encoders.LONG());       
Dataset<Row> longIds = valueIds.withColumnRenamed("value", "userIdMapped");
Dataset<Row> userIdMap = intIds.join(stringIds);    
Dataset<Row> feedbackDSUserMapped = feedbackDS.join(userIdMap, feedbackDS.col("userId").equalTo(userIdMap.col("value")),
            "inner");
//Here 'value' column contains string user ids

userIdMap 数据集连接错误,如下所示:

+-----------------+----------------+
|   userIdMapped  |     value      |
+-----------------+----------------+
|         0       |     abc13      |   
+-----------------+----------------+
|         0       |     qwe34      |   
+-----------------+----------------+
|         1       |     abc13      |   
+-----------------+----------------+
|         1       |     qwe34      |   
+-----------------+----------------+

因此生成的 feedbackDSUserMapped 是错误的。

我是 Spark 的新手,我相信一定有更好的方法来做到这一点。

pairRDD 获取long 值并设置为初始数据集(RDD) 中的相关userId 的最佳方法是什么?

如有任何帮助,我们将不胜感激。

数据将用于 ALS 模型。

最佳答案

您可以尝试以下操作。使用内置函数分配唯一 ID 并与原始数据集连接

/**
 * Created by RGOVIND on 11/16/2016.
 */

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;

import java.util.ArrayList;
import java.util.List;

public class SparkUserObjectMain {
    static public void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Stack Overflow App");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        List<UserObject> users = new ArrayList<UserObject>();

        //seed the data
        UserObject user1 = new UserObject("abc13", "23", "1");
        UserObject user2 = new UserObject("qwe34", "56", "3");
        UserObject user3 = new UserObject("qwe34", "35", "4");
        users.add(user1);
        users.add(user2);
        users.add(user3);

        //how to encode the object ?
        Encoder<UserObject> userObjectEncoder = Encoders.bean(UserObject.class);
        //Create the user dataset
        Dataset<UserObject> usersDataSet = sqlContext.createDataset(users, userObjectEncoder);
        //assign unique id's
        Dataset<Row> uniqueUsersWithId = usersDataSet.dropDuplicates("userId").select("userId").withColumn("id", functions.monotonically_increasing_id());
        //join with original
        Dataset<Row> joinedDataSet = usersDataSet.join(uniqueUsersWithId, "userId");
        joinedDataSet.show();

    }
}

bean :

/**
 * Created by RGOVIND on 11/16/2016.
 */
public class UserObject {

    private String userId;
    private String itemId;
    private String rating;

public UserObject(){

    }

    public UserObject(String userId, String itemId, String rating) {
        this.userId = userId;
        this.itemId = itemId;
        this.rating = rating;
    }

    public String getUserId() {

        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }

    public String getRating() {
        return rating;
    }

    public void setRating(String rating) {
        this.rating = rating;
    }

}

打印:

+------+------+------+------------+
|userId|itemId|rating|          id|
+------+------+------+------------+
| abc13|    23|     1|403726925824|
| qwe34|    56|     3|901943132160|
| qwe34|    35|     4|901943132160|
+------+------+------+------------+

关于java - 如何在 Java 中结合使用 Spark RDD 和 PairRDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40625202/

相关文章:

apache-spark - 如何将数据框保存到 PySpark 中的 Elasticsearch?

python - 如何在 pyspark 管道中打印最佳模型参数

java - 在 Spark 中运行迭代程序(Java)时出现 Stackoverflow 错误

python - Pyspark 架构中 StructType 的 VectorType

java - Spring boot中的RestController - 404错误而不是JSON对象

java - Maven 找不到 Struts2 依赖项

java - SOAP Web 服务 : Axis2 1. 6.2

java - 在 JNA 定义需要回调的地方传递原始函数指针?

java - 在 JPanel 上显示 jpg 图像

hadoop - 启动 hive 和 spark 时未发现合适的驱动程序错误