我有一个 PostgreSQL
数据库,其中包含一个已定义的表和列。该表的主键是 (Id, datetime)
列的组合。我需要定期将来自 R
data.table
的不同 ID 的数据插入数据库。但是,如果特定 (Id, datetime)
组合的数据已经存在,则应该更新(覆盖)。我如何使用 RPostgres
或 RPostgreSQL
包来做到这一点?
当我尝试插入一个 data.table 其中一些 (Id, datetime) 行已经存在时,我收到一条错误消息,指出违反了主键约束:
dbWriteTable(con, table, dt, append = TRUE, row.names = FALSE)
Error in connection_copy_data(conn@ptr, sql, value) :
COPY returned error: ERROR: duplicate key value violates unique constraint "interval_data_pkey"
DETAIL: Key (id, dttm_utc)=(a0za000000CSdLoAAL, 2018-10-01 05:15:00+00) already exists.
CONTEXT: COPY interval_data, line 1
最佳答案
您可以使用我的具有 upsert 功能的 pg
包,或者只是从那里获取 upsert 代码:https://github.com/jangorecki/pg/blob/master/R/pg.R#L249
基本上都是别人评论里说的。将数据写入临时表,然后使用 on conflict
子句插入目标表。
pgSendUpsert = function(stage_name, name, conflict_by, on_conflict = "DO NOTHING", techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
cols = pgListFields(stage_name)
cols = setdiff(cols, c("run_id","r_timestamp")) # remove techstamp to have clean column list, as the fresh one will be used, if any
# sql
insert_into = sprintf("INSERT INTO %s.%s (%s)", name[1L], name[2L], paste(if(techstamp) c(cols, c("run_id","r_timestamp")) else cols, collapse=", "))
select = sprintf("SELECT %s", paste(cols, collapse=", "))
if(techstamp) select = sprintf("%s, %s::INTEGER run_id, '%s'::TIMESTAMPTZ r_timestamp", select, get_run_id(), format(Sys.time(), "%Y-%m-%d %H:%M:%OS"))
from = sprintf("FROM %s.%s", stage_name[1L], stage_name[2L])
if(!missing(conflict_by)) on_conflict = paste(paste0("(",paste(conflict_by, collapse=", "),")"), on_conflict)
on_conflict = paste("ON CONFLICT",on_conflict)
sql = paste0(paste(insert_into, select, from, on_conflict), ";")
pgSendQuery(sql, conn = conn, .log = .log)
}
#' @rdname pg
pgUpsertTable = function(name, value, conflict_by, on_conflict = "DO NOTHING", stage_name, techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
name = schema_table(name)
if(!missing(stage_name)){
stage_name = schema_table(stage_name)
drop_stage = FALSE
} else {
stage_name = name
stage_name[2L] = paste("tmp", stage_name[2L], sep="_")
drop_stage = TRUE
}
if(pgExistsTable(stage_name)) pgTruncateTable(name = stage_name, conn = conn, .log = .log)
pgWriteTable(name = stage_name, value = value, techstamp = techstamp, conn = conn, .log = .log)
on.exit(if(drop_stage) pgDropTable(stage_name, conn = conn, .log = .log))
pgSendUpsert(stage_name = stage_name, name = name, conflict_by = conflict_by, on_conflict = on_conflict, techstamp = techstamp, conn = conn, .log = .log)
}
关于r - 将 R data.table 插入/更新到 PostgreSQL 表中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57463441/