scala - 不支持没有相等谓词的流流连接

标签 scala apache-spark apache-spark-sql spark-structured-streaming

我正在使用 Spark 2.3 并尝试连接两个数据流。我的左流和右流都有一个数组。只有当右流数组是左流数组的子集时,我才想加入这两个流。

例如,我的 streaA 看起来像这样:

StreamA:
|---|------|---------------------|-----------|
|id | dept | employeesInMeetings | DateTime  |
|---|------|---------------------|-----------|
| 1 | sales| [John]              | 7/2 14:00 |
| 2 | mktg | [Adam, Mike]        | 7/2 12:30 |
| 3 | hr   | [Rick, Jill, Andy]  | 7/2 14:00 |
|---|------|---------------------|-----------|

我的 streamB 如下所示:

StreamB:
|--------------|--------------|----------|
|employees     | confRooms    | DateTime |
|--------------|--------------|----------|
| [John, Jane] |      A       | 7/2 14:00|
| [Adam, Mike] |      C       | 7/2 12:30| 
| [Jill, Andy] |      B       | 7/2 14:00|
|--------------|--------------|----------|

我只关心参加同一次 session 的同一部门的员工。因此,作为交集的结果,我的结果流需要如下所示:

|---|------|---------------------|-----------|----------|
|id | dept | employeesInMeetings | DateTime  | confRoom |
|---|------|---------------------|-----------|----------|
| 2 | mktg | [Adam, Mike]        | 7/2 12:30 |    C     |
| 3 | hr   | [Rick, Jill, Andy]  | 7/2 14:00 |    B     |
|---|------|---------------------|-----------|----------|

我创建了一个 UDF 来进行相交:

val arrayIntersect = udf((leftArr: Array[String], rightArr: Array[String]) => {
  import spark.implicits._
  if(leftArr.intersect(rightArr.toSeq).length == rightArr.size){
    true
  } else {
    false
  }
})

并尝试按如下方式使用它:

streamA.joinWith(streamB, expr("arrayIntersect(leftArr, rightArr) AND streamA.DateTime BETWEEN streamB.DateTime and streamB.DateTime + INTERVAL 12 hours"))

但是,我得到了错误:

org.apache.spark.sql.AnalysisException: Stream stream joins without equality predicate is not supported;

有人知道这里是否有解决方法吗?任何帮助将不胜感激!谢谢!

最佳答案

不幸的是,在流-流连接中没有解决这个问题的方法:(

我们确实需要一个相等谓词,因为我们使用它来执行使用流式对称散列连接算法的连接——两个流都使用公共(public) key 进行分区,以便来自两个流的相关记录最终位于同一分区中。

关于scala - 不支持没有相等谓词的流流连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51164701/

相关文章:

apache-spark - Spark 中的任务是什么? Spark Worker如何执行jar文件?

apache-spark - spark 3.1.2 与 3.0.2 相比有什么显着变化导致在有足够内存可用时使用磁盘空间

azure - UPDATE、DROP COLUMN 和 EXCEPT 在 Spark SQL 中不起作用

java - Play 2.5.x 和 Akka 2.4.7 中构造函数 MyWebSocketActor 的参数过多

java - 如何仅当后面的字符是数字而前一个字符是字母时才用 "-"分割字符串? Java/Scala

hadoop - 保存 rdd 时 saveAsTextFile 失败

apache-spark - Databricks notebook time out error when calling other notebooks : com. databricks.WorkflowException : java.net.SocketTimeoutException: 读取超时

date - to_date 在格式 yyyyww 上给出空值(202001 和 202053)

scala - 当其中一个类型参数应该为 Nothing 时,为什么 Scala 的隐式类不起作用?

scala - 类型安全配置 : Load additional config from path external to packaged scala application