我们可以看到StateStoreRestoreExec如下。
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
numOutputRows += 1
row +: savedState.toSeq
}
}
这里,我想知道 row +: savingState.toSeq
的含义。我认为 row 是 UnsafeRow 的实例,savedState.toSeq 是 Seq 的实例。那么我们如何用+:
来操作它们呢。另一方面,我认为savedState是UnsafeRow的实例,而toSeq不是UnsaveRow的成员,那么savedState.toSeq
如何工作?
最佳答案
row
是 InternalRow
的实例,savedState
是 Option[UnsafeRow]
,它扩展了内部行
。这里发生的情况是,保存的状态从 Option[UnsafeRow]
转换为 Seq[UnsafeRow]
,然后将 row
实例添加到该序列。
当您对这些 UnsafeRow
对象进行 flatMap
时,您会得到一个 Iterator[UnsafeRow]
。
关于scala - "row +: savedState.toSeq"在StateStoreRestoreExec.doExecute中做了什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45821094/