apache-spark - Spark 结构化流可扩展性和重复问题

标签 apache-spark snowflake-cloud-data-platform databricks spark-structured-streaming azure-eventhub

我正在 Databricks 集群上使用 Spark 结构化流从 Azure 事件中心提取数据,对其进行处理,然后使用 ForEachBatch 将其写入雪花,并将 Epoch_Id/Batch_Id 传递给 foreach 批处理函数。

我的代码如下所示:

ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_CONNECTION_STRING)
ehConf['eventhubs.consumerGroup'] = consumergroup

# Read stream data from event hub
spark_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

一些转换...

写入雪花

def foreach_batch_function(df, epoch_id):
       df.write\
            .format(SNOWFLAKE_SOURCE_NAME)\
            .options(**sfOptions)\
            .option("dbtable", snowflake_table)\
            .mode('append')\
            .save()

processed_df.writeStream.outputMode('append').\
    trigger(processingTime='10 seconds').\
    option("checkpointLocation",f"checkpoint/P1").\
    foreachBatch(foreach_batch_function).start()

目前我面临两个问题:

  1. 当节点发生故障时。虽然在 Spark 官方网站上,提到当在恢复表单节点故障期间使用 ForeachBatch 和 epoch_id/batch_id 时,不应该有任何重复项,但我确实发现我的雪花表中填充了重复项。引用链接:[Spark Structured Streaming ForEachBatch With Epoch Id][1]。

  2. 我遇到错误 a.)TransportClient: 无法将 RPC RPC 5782383376229127321 发送到/30.62.166.7:31116: java.nio.channels.ClosedChannelException 和 b.) TaskSchedulerImpl: Lost executor 1560 on 30.62.166.7: Worker Decommissioned: Worker Decommissioned 在我的 databricks 集群上非常频繁。无论我分配多少个执行程序或增加多少执行程序内存,集群都会达到最大工作限制,并且我会收到两个错误之一,在恢复后,我的雪花表中会填充重复项。

对上述任何一点的任何解决方案/建议都会有所帮助。

提前致谢。

最佳答案

foreachBatch 根据定义不是幂等的,因为当当前执行的批处理失败时,它会重试,并且可以观察到部分结果,这与您的观察结果相符。 foreachBatch 中的幂等写入为 applicable仅适用于 Delta Lake 表,不适用于所有接收器类型(在某些情况下,例如 Cassandra,它也可以工作)。我对 Snowflake 不太熟悉,但也许你可以实现类似于其他数据库的东西 - 将数据写入临时表(每个批处理都会进行覆盖),然后从该临时表合并到目标表中。

关于第二个问题 - 看起来您正在使用自动缩放集群 - 在这种情况下,工作人员可能会退役,因为集群管理器检测到集群未完全加载。为了避免这种情况,您可以禁用自动缩放,并使用固定大小的集群。

关于apache-spark - Spark 结构化流可扩展性和重复问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74423430/

相关文章:

azure datalake gen2 databricks ACL 权限

Azure Databricks 单元执行停留在等待运行状态

apache-spark - 如何在 spark sql 中使用 hive 钩子(Hook)

python - Spark(pyspark)中的决策树模型如何可视化?

datetime - 如何使用 Snowflake SQL 解析 ISO 8601 时间戳?

sql - 基于一列具有唯一值的频率的缺乏而对一列进行不同计数

python - Databricks API 2.0 - 创建 secret 范围 - TEMPORARILY_UNAVAILABLE

python - PySpark 和访问 HDFS

json - Spark 如何在 Scala 的两个 JSONS 中更改键的数量?

amazon-web-services - 将 Snowflake 中的数据作为 XLSX 文件保存到 S3 存储桶