apache-spark - Spark Streaming -> DStream.checkpoint 与 Spark Streaming.checkpoint

标签 apache-spark spark-streaming

我有 Spark 1.4 Streaming 应用程序,它从 Kafka 读取数据,使用有状态转换,批处理间隔为 15 秒。

为了使用有状态转换以及从驱动程序故障中恢复,我需要在流上下文上设置检查点。

此外,在 Spark 1.4 文档中,他们建议 DStream 检查点为批处理间隔的 5-10 倍。

所以我的问题是:

如果我只在 Spark 流上下文上设置检查点会发生什么?我猜 DStreams 会在每个批处理间隔进行检查点?

如果我在流上下文上以及从 Kafka 读取数据时设置检查点,我会设置:

DStream.checkpoint(90 seconds)

元数据检查点的间隔是多少,数据检查点的间隔是多少(即 DStreams)?

谢谢。

最佳答案

I guess DStreams will be checkpointed every batch interval?

不,Spark 会在每个批处理间隔乘以一个常量后对您的数据进行检查点。这意味着,如果您的批处理间隔为 15 秒,则每隔 15 秒的倍数就会检查一次数据。例如,在 mapWithState 中,这是一个有状态流,您可以看到批处理间隔乘以 10:

private[streaming] object InternalMapWithStateDStream {
  private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}

What will be the intervals for metadata checkpointing and what for data checkpointing (meaning DStreams)?

如果您在 DStream 上将检查点持续时间设置为 90 秒,那么这将是您的检查点持续时间,这意味着每 90 秒数据就会被检查一次。您无法直接在 StreamingContext 上设置检查点持续时间,您所能做的就是传递检查点目录。 checkpoint 的重载仅需要一个String:

/**
 * Set the context to periodically checkpoint the DStream operations for driver
 * fault-tolerance.
 * @param directory HDFS-compatible directory where the checkpoint
 *        data will be reliably stored.
 *        Note that this must be a fault-tolerant file system like HDFS.
 */
def checkpoint(directory: String)

编辑

对于updateStateByKey,检查点的时间似乎设置为批处理时间乘以Seconds(10)/slipDuration:

// Set the checkpoint interval to be slideDuration or 10 seconds,
// which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
  checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
  logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
}

关于apache-spark - Spark Streaming -> DStream.checkpoint 与 Spark Streaming.checkpoint,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37721421/

相关文章:

windows - 用于 winutils 和 hadoop/spark 的 Powershell chmod on/tmp/hive

python - Spark 3.0.0 错误创建 SparkSession : pyspark. sql.utils.IllegalArgumentException: <exception str() failed>

apache-spark - 为什么 python UDF 返回意外的日期时间对象,而在 RDD 上应用的相同函数给出了正确的日期时间对象

apache-spark - Spark Streaming 中的窗口?

scala insert to redis 给出任务不可序列化

scala - RDD 映射中的 Spark Scala 序列化错误

scala - 在 Apache Spark 中使用分类和数值特征对数据进行聚类

hadoop - java.lang.NullPointerException:在Spark Streaming作业中写入 Parquet 文件时,writeSupportClass不应为null

apache-spark - DStreams 的 Spark 流检查点

apache-spark - 在 PyCharm 中使用 Kafka 进行 Pyspark 流式处理