postgresql - 从 Dataframe 到 DB 的批量插入忽略 Pyspark 中的失败行

标签 postgresql jdbc pyspark

我正在尝试使用 JDBC 写入将 spark DF 插入到 Postgres。 postgres 表对其中一列有唯一约束,当要插入的 df 违反约束时,整个批处理将被拒绝并且 spark session 关闭并给出错误 duplicate key value violates unique constraint 这是正确的数据重复(已存在于数据库中) org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148

需要插入不违反约束的数据行并忽略失败的行,而不会使整个批处理失败。

使用的代码是:

mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"} 
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)

我该怎么做?

最佳答案

遗憾的是,Spark 没有开箱即用的解决方案。我看到了许多可能的解决方案:

  1. 作为 forEachPartition 的一部分,在 PostgreSQL 数据库中实现冲突解决的业务逻辑功能。例如,捕获违反约束的异常,然后报告到日志。

  2. 放弃对 PostgreSQL 数据库的约束,使用自动生成的 PK 意味着可以在数据库中存储重复的行。重复数据删除逻辑可以进一步实现为每个 SQL 查询的一部分或每天/每小时运行重复数据删除。你可以看到例子 here .

  3. 如果除了您的 Spark 作业之外没有其他系统或进程写入 PostgreSQL 表,则可以使用连接操作进行过滤,以在 spark.write 类似 this 之前从 Spark Dataframe 中删除所有现有行。

希望我的想法对您有所帮助。

关于postgresql - 从 Dataframe 到 DB 的批量插入忽略 Pyspark 中的失败行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51612514/

相关文章:

python - 在 python/pyspark 中获取 k-means 质心和异常值

database - 记录两次插入数据库

java - jdbcTemplate.update 用于自动递增和唯一 ID 字段

php - 当脚本工作时,Postgres now() 时间戳不会改变

java - 大型数据库不应该使用hibernate吗

Java 和 PostgreSQL 存储过程 - 返回注册为输出参数,导致输入参数出现问题

python - pyspark 中的每月聚合

apache-spark - 使用 bucketBy 的 Spark 模式与 Hive 不兼容

ruby - 我尝试使用 DBI gem 连接我的 PostgreSQL 服务器

sql - 从表中的开始和结束日期开始在 Postgres 中生成_series