我正在尝试使用 JDBC 写入将 spark DF 插入到 Postgres。 postgres 表对其中一列有唯一约束,当要插入的 df 违反约束时,整个批处理将被拒绝并且 spark session 关闭并给出错误 duplicate key value violates unique constraint 这是正确的数据重复(已存在于数据库中) org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148
需要插入不违反约束的数据行并忽略失败的行,而不会使整个批处理失败。
使用的代码是:
mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"}
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)
我该怎么做?
最佳答案
遗憾的是,Spark 没有开箱即用的解决方案。我看到了许多可能的解决方案:
作为 forEachPartition 的一部分,在 PostgreSQL 数据库中实现冲突解决的业务逻辑功能。例如,捕获违反约束的异常,然后报告到日志。
放弃对 PostgreSQL 数据库的约束,使用自动生成的 PK 意味着可以在数据库中存储重复的行。重复数据删除逻辑可以进一步实现为每个 SQL 查询的一部分或每天/每小时运行重复数据删除。你可以看到例子 here .
如果除了您的 Spark 作业之外没有其他系统或进程写入 PostgreSQL 表,则可以使用连接操作进行过滤,以在 spark.write 类似 this 之前从 Spark Dataframe 中删除所有现有行。
希望我的想法对您有所帮助。
关于postgresql - 从 Dataframe 到 DB 的批量插入忽略 Pyspark 中的失败行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51612514/