java - 如何在 Apache Spark 中为两个具有不同结构的 DataFrame 实现 NOT IN

标签 java sql apache-spark apache-spark-sql

我在我的 Java 应用程序中使用 Apache Spark。 我有两个 DataFrame小号:df1df2 . df1包含 Rowemail , firstNamelastName . df2包含 Rowemail .

我想创建一个 DataFrame : df3包含 df1 中的所有行, df2 中不存在哪个电子邮件.

有没有办法用 Apache Spark 做到这一点?我试图创建 JavaRDD<String>来自 df1df2通过类型转换它们 toJavaRDD()和过滤 df1包含所有电子邮件,然后使用 subtract , 但我不知道如何映射新的 JavaRDDds1得到 DataFrame .

基本上我需要 df1 中的所有行谁的邮箱不在df2 .

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
                            "WHERE product_id = '" + productId + "'");

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));

List<String> notBoughtEmails = customers.javaRDD()
                        .map(row -> row.getString(0))
                        .subtract(customersBoughtEmail).collect();

最佳答案

Spark 2.0.0+

你可以直接使用NOT IN

Spark < 2.0.0

可以用outer join和filter来表达。

val customers = sc.parallelize(Seq(
  ("john@example.com", "John", "Doe"),
  ("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
  Tuple1("jane@example.com")
)).toDF("email")

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")),
    $"email" === $"email_", "leftouter")
 .where($"email_".isNull).drop("email_")

customersWhoHaventOrderedTheProduct.show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+

等效的原始 SQL:

customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
  "customersWhoOrderedTheProduct")

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN  
                 customersWhoOrderedTheProduct o
               ON c.email = o.email
               WHERE o.email IS NULL"""

sqlContext.sql(query).show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+

关于java - 如何在 Apache Spark 中为两个具有不同结构的 DataFrame 实现 NOT IN,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33652013/

相关文章:

Java I/O 在服务器端阻塞线程

SQL搜索问题

amazon-web-services - Airflow Emr Dag 成功但集群未启动

java - Android Studio 在重新启动时显示错误

java - 使用按位运算符的两个数字的总和

mysql - 在控制台上运行相同的查询需要两次时间?

hadoop - 使用 Apache Spark 多次插入到表中

java - Spark异常: java. io.NotSerializedException : org. apache.spark.streaming.api.java.JavaStreamingContext

java - Java 7 中的套接字未接收输入

php - 如何为特定酒店添加房间类别