scala - "row +: savedState.toSeq"在StateStoreRestoreExec.doExecute中做了什么?

标签 scala apache-spark spark-structured-streaming

我们可以看到StateStoreRestoreExec如下。

case class StateStoreRestoreExec(
    keyExpressions: Seq[Attribute],
    stateId: Option[OperatorStateId],
    child: SparkPlan)
  extends UnaryExecNode with StateStoreReader {

  override protected def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")

  child.execute().mapPartitionsWithStateStore(
    getStateId.checkpointLocation,
    operatorId = getStateId.operatorId,
    storeVersion = getStateId.batchId,
    keyExpressions.toStructType,
    child.output.toStructType,
    sqlContext.sessionState,
    Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
      val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
      iter.flatMap { row =>
        val key = getKey(row)
        val savedState = store.get(key)
        numOutputRows += 1
        row +: savedState.toSeq
      }
}

这里,我想知道 row +: savingState.toSeq 的含义。我认为 row 是 UnsafeRow 的实例,savedState.toSeq 是 Seq 的实例。那么我们如何用+:来操作它们呢。另一方面,我认为savedState是UnsafeRow的实例,而toSeq不是UnsaveRow的成员,那么savedState.toSeq如何工作?

最佳答案

rowInternalRow 的实例,savedStateOption[UnsafeRow],它扩展了内部行。这里发生的情况是,保存的状态从 Option[UnsafeRow] 转换为 Seq[UnsafeRow],然后将 row 实例添加到该序列。

当您对这些 UnsafeRow 对象进行 flatMap 时,您会得到一个 Iterator[UnsafeRow]

关于scala - "row +: savedState.toSeq"在StateStoreRestoreExec.doExecute中做了什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45821094/

相关文章:

scala - Scala Play 应用程序中没有绑定(bind) OWrites 和 Reads 的实现

spark-structured-streaming - Spark 结构化流异常处理

apache-spark - 遗失的遗嘱执行人Spark

java - 从 Scala 到 Java 1.8

apache-spark - spark writeStream 到 kafka - awaitTermination() 与 awaitAnyTermination() 之间的区别

apache-spark - pyspark-结构化流到 Elasticsearch

scala - `sbt run` 导致添加依赖后编译报错

scala - 编写一个不返回任何内容的 Scala 方法

Scala - 如何使用映射从 (A,B) 类型的元组列表转换为 (B,A) 类型

apache-spark - Spark SQL - 使用 SQL 语句而不是表名通过 JDBC 加载数据