pyspark - Databricks 中的 StreamingQuery 增量表 - 描述历史

标签 pyspark spark-streaming databricks delta-lake aws-databricks

我有一个 Delta 表,我正在读取它作为 StreamingQuery。

使用 DESCRIBE History 查看增量表历史,我看到 99% 的 OperationMetrics 表明 numTargetRowsUpdates 为 0,大多数操作都是插入。但是,偶尔会有 2-3 个 numTargetRowsUpdates > 1。Delta 表上的操作不过是合并。

我是否仍可以使用 StreamingQuery 并将此数据作为流读取,否则我会出错吗?。 即:

df: DataFrame = spark \
                .readStream \
                .format("delta") \
                .load(f"{table_location}") \

         df.writeStream \
                    .format("delta") \
                    .outputMode("append") \
                    .option("checkpointLocation", f "{checkpoint}/{table_location}")\
                    .trigger(once=True) \
                    .foreachBatch(process_batch) \
                    .start()

现在我有另一个 Delta 表,它更像是客户信息的维度表,即电子邮件、姓名、上次上线时间等。 我最初将其作为附加的 StreamingQuery 阅读,但出现以下错误:java.lang.UnsupportedOperationException: Detected a data update

查看此表,在描述历史记录中,我看到发生了许多更新。问题:如果我将 StreamQuery 与 IgnoreChanges, True 一起使用,这是否会将更新的记录作为新记录发送,我可以在 foreachBatch 中进一步处理?

最佳答案

如果增量源中有更新或删除,读取流将抛出异常。这从 databricks documentation: 中也很清楚。

Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source.

如果您使用 IgnoreChanges, True,它不会抛出异常,但会为您提供更新的行 + 可能已经处理过的行。这是因为增量表中的所有内容都发生在文件级别。例如,如果您更新文件中的一行(大致),将发生以下情况:

  1. 查找并读取包含要更新的记录的文件
  2. 写一个新文件,其中包含更新的记录 + 旧文件中的所有其他数据
  3. 在事务日志中将旧文件标记为已删除,并将新文件标记为已添加
  4. 您的读取流会将整个新文件读取为"new"记录。这意味着您可以在自己的 Steam 中获得重复项。

文档中也提到了这一点。

ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. ...

您必须决定这是否适合您的用例。如果您需要专门处理更新和删除数据 block 提供 Change Data Feed ,您可以在增量表上启用它。这会为您提供有关插入、追加和删除的行级详细信息(以一些额外的存储和 IO 为代价)。

关于pyspark - Databricks 中的 StreamingQuery 增量表 - 描述历史,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71866652/

相关文章:

apache-spark - 从 Spark 外部数据库中进行微批量查找

apache-kafka - 如何在不覆盖的情况下将 Spark Streaming 输出写入 HDFS

xml - 从 Scala 中的 StructType 中提取行标记架构以解析嵌套 XML

Azure Databricks 无法为 Autoloader 流创建事件网格订阅

scala - Spark - 将嵌套列更新为字符串

apache-spark - 如何识别 Spark Dataframe 中的离散状态(振荡)?

python-3.x - 如何在pyspark中压缩两列?

python - 如何从 PySpark 中的 map 方法返回空(null?)项?

scala - 卡夫卡+ Spark 流: Multi topic processing in single job

python - 如何从笔记本或 databricks 上的命令行运行 pytest?