我在我的 Java 应用程序中使用 Apache Spark。
我有两个 DataFrame
小号:df1
和 df2
. df1
包含 Row
与 email
, firstName
和 lastName
. df2
包含 Row
与 email
.
我想创建一个 DataFrame
: df3
包含 df1
中的所有行, df2
中不存在哪个电子邮件.
有没有办法用 Apache Spark 做到这一点?我试图创建 JavaRDD<String>
来自 df1
和 df2
通过类型转换它们 toJavaRDD()
和过滤 df1
包含所有电子邮件,然后使用 subtract
, 但我不知道如何映射新的 JavaRDD
至 ds1
得到 DataFrame
.
基本上我需要 df1
中的所有行谁的邮箱不在df2
.
DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");
DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
"WHERE product_id = '" + productId + "'");
JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));
List<String> notBoughtEmails = customers.javaRDD()
.map(row -> row.getString(0))
.subtract(customersBoughtEmail).collect();
最佳答案
Spark 2.0.0+
你可以直接使用NOT IN
。
Spark < 2.0.0
可以用outer join和filter来表达。
val customers = sc.parallelize(Seq(
("john@example.com", "John", "Doe"),
("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")
val customersWhoOrderedTheProduct = sc.parallelize(Seq(
Tuple1("jane@example.com")
)).toDF("email")
val customersWhoHaventOrderedTheProduct = customers.join(
customersWhoOrderedTheProduct.select($"email".alias("email_")),
$"email" === $"email_", "leftouter")
.where($"email_".isNull).drop("email_")
customersWhoHaventOrderedTheProduct.show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com| John| Doe|
// +----------------+----------+---------+
等效的原始 SQL:
customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
"customersWhoOrderedTheProduct")
val query = """SELECT c.* FROM customers c LEFT OUTER JOIN
customersWhoOrderedTheProduct o
ON c.email = o.email
WHERE o.email IS NULL"""
sqlContext.sql(query).show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com| John| Doe|
// +----------------+----------+---------+
关于java - 如何在 Apache Spark 中为两个具有不同结构的 DataFrame 实现 NOT IN,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33652013/