scala - 结构化的 Spark 流 leftOuter join 的行为类似于 inner join

标签 scala apache-spark left-join spark-streaming

我正在尝试结构化的 Spark Streaming 流-流连接,我的左外部连接的行为与内部连接完全相同。

Using spark version 2.4.2 and Scala version 2.12.8, Eclipse OpenJ9 VM, 1.8.0_252

这是我想做的,

  1. 创建每秒生成 1 行的速率流。
  2. 从中创建 Employee 和 Dept 流。
  3. 员工流 deptId 字段将 rate 值乘以 2,Dept 流 id 字段乘以 3
  4. 这样做的目的是让两个流有一些共同的和不共同的 id 字段。
  5. 执行 leftOuter 流-流连接,时间限制为 30 秒,并在连接左侧执行部门流。

期望: 在 30 秒的时间限制后,对于不匹配的行,我应该在连接的右侧看到 null。

发生了什么

  • 我只看到 ID 匹配的行,而不是不匹配的行。

代码 - 尝试使用 spark-shell

import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

case class RateData(timestamp: Timestamp, value: Long)

// create rate source with 1 row per second.
val rateSource = spark.readStream.format("rate").option("rowsPerSecond", 1).option("numPartitions", 1).option("rampUpTime", 1).load()

import spark.implicits._
val rateSourceData = rateSource.as[RateData]

// employee stream departid increments by 2
val employeeStreamDS = rateSourceData.withColumn("firstName",  concat(lit("firstName"),rateSourceData.col("value")*2)).withColumn("departmentId", lit(floor(rateSourceData.col("value")*2))).withColumnRenamed("timestamp", "empTimestamp").withWatermark("empTimestamp", "10 seconds")

// dept stream id increments by 3
val departmentStreamDS = rateSourceData.withColumn("name", concat(lit("name"),floor(rateSourceData.col("value")*3))).withColumn("Id", lit(floor(rateSourceData.col("value")*3))).drop("value").withColumnRenamed("timestamp", "depTimestamp")

// watermark - 10s and time constraint is 30 secs on employee stream.
val joinedDS  =  departmentStreamDS.join(employeeStreamDS, expr(""" id = departmentId AND empTimestamp >= depTimestamp AND empTimestamp <= depTimestamp + interval 30 seconds """), "leftOuter")

val q = joinedDS.writeStream.format("parquet").trigger(Trigger.ProcessingTime("60 seconds")).option("checkpointLocation", "checkpoint").option("path", "rate-output").start

我在 10 分钟后查询了表的输出,我只找到了 31 个匹配的行。这与内部连接输出相同。

val df = spark.read.parquet("rate-output")
 df.count
res0: Long = 31
df.agg(min("departmentId"), max("departmentId")).show
+-----------------+-----------------+
|min(departmentId)|max(departmentId)|
+-----------------+-----------------+
|                0|              180|
+-----------------+-----------------+

输出说明。 employeeStreamDS 流,departmentId 字段值是 rate 值的 2 倍,因此是 2 的倍数。

departmentStreamDS 流,Id 字段是速率流值的 3 倍,因此是 3 的倍数。

所以每 6 个都会匹配 departmentId = Id,因为 LCM(2,3) = 6。 这种情况会发生,直到这些流之间存在 30 秒的差异(加入时间限制)。

我希望在 30 秒后,我将为部门流值(3,9,15 ..)等设置空值。

我希望我解释得足够好。

所以关于 spark streaming 的 left-outer join 行为的结果问题。

最佳答案

根据我的理解,确实根据 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins ,您需要在两个流的事件时间列上应用水印,例如:

val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
...
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
 )

您只定义了一个watermark

关于scala - 结构化的 Spark 流 leftOuter join 的行为类似于 inner join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63181277/

相关文章:

scala - 使用 LAPACK-BLAS DGEMM 的矩阵点积

scala - 在 Scala.js 中,代码如何检测它是在浏览器窗口中运行还是在 WebWorker 中运行?

arrays - 在 Spark 中访问数组列

python - 如何让scala字符串拆分以匹配python

java - 使用大量列在 Scala 中转换数据框行

mysql - Left Join/IS NULL 如何消除一个表中存在而另一个表中没有的记录?

scala - 选择与 future

apache-spark - Spark : Reading files using different delimiter than new line

mysql - 左连接,我需要有关代码的解释

mysql - 比较 mysql 表以查看每个条目是否有可用记录