python - 使用 psycopg2 批量更新 Postgres DB 中的行

标签 python postgresql psycopg2

我们需要对 Postgres 数据库中的许多行进行批量更新,并希望使用下面的 SQL 语法。我们如何使用 psycopg2 做到这一点?

UPDATE table_to_be_updated
SET msg = update_payload.msg
FROM (VALUES %(update_payload)s) AS update_payload(id, msg)
WHERE table_to_be_updated.id = update_payload.id
RETURNING *

尝试 1 - 传递值

我们需要将嵌套的可迭代格式传递给 psycopg2 查询。对于 update_payload ,我试过传递列表列表、元组列表和元组元组。这一切都因各种错误而失败。

尝试 2 - 使用 __conform__ 编写自定义类

我试图编写一个我们可以用于这些操作的自定义类,它将返回
(VALUES (row1_col1, row1_col2), (row2_col1, row2_col2), (...))

我按照以下说明进行了编码 here ,但很明显我做错了什么。例如,在这种方法中,我必须处理表内所有值的引用,这会很麻烦并且容易出错。
class ValuesTable(list):
    def __init__(self, *args, **kwargs):
        super(ValuesTable, self).__init__(*args, **kwargs)

    def __repr__(self):
        data_in_sql = ""
        for row in self:
            str_values = ", ".join([str(value) for value in row])
            data_in_sql += "({})".format(str_values)
        return "(VALUES {})".format(data_in_sql)

    def __conform__(self, proto):
        return self.__repr__()

    def getquoted(self):
        return self.__repr__()

    def __str__(self):
        return self.__repr__()

编辑:如果可以使用另一种语法以更快/更清晰的方式进行批量更新,而不是我原来的问题中的语法,那么我全神贯注!

最佳答案

要求:

  • Postgres 表,包含字段 id 和 msg(可能还有其他字段)
  • 包含 msg
  • 新值的 Python 数据
  • Postgres 表应该通过 psycopg2 更新

  • 示例表
    CREATE TABLE einstein(
       id CHAR(5) PRIMARY KEY,
       msg VARCHAR(1024) NOT NULL
    );
    

    测试数据
    INSERT INTO einstein VALUES ('a', 'empty');
    INSERT INTO einstein VALUES ('b', 'empty');
    INSERT INTO einstein VALUES ('c', 'empty');
    

    Python程序

    假设的、自包含的示例程序,引用了一位著名物理学家的名言。
    import sys
    import psycopg2
    from psycopg2.extras import execute_values
    
    
    def print_table(con):
        cur = con.cursor()
        cur.execute("SELECT * FROM einstein")
        rows = cur.fetchall()
        for row in rows:
            print(f"{row[0]} {row[1]}")
    
    
    def update(con, einstein_quotes):
        cur = con.cursor()
        execute_values(cur, """UPDATE einstein 
                               SET msg = update_payload.msg 
                               FROM (VALUES %s) AS update_payload (id, msg) 
                               WHERE einstein.id = update_payload.id""", einstein_quotes)
        con.commit()
    
    
    def main():
        con = None
        einstein_quotes = [("a", "Few are those who see with their own eyes and feel with their own hearts."),
                           ("b", "I have no special talent. I am only passionately curious."),
                           ("c", "Life is like riding a bicycle. To keep your balance you must keep moving.")]
    
        try:
            con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
            print_table(con)
            update(con, einstein_quotes)
            print("rows updated:")
            print_table(con)
    
        except psycopg2.DatabaseError as e:
    
            print(f'Error {e}')
            sys.exit(1)
    
        finally:
    
            if con:
                con.close()
    
    
    if __name__ == '__main__':
        main()
    

    准备好的报表替代
    import sys
    import psycopg2
    from psycopg2.extras import execute_batch
    
    
    def print_table(con):
        cur = con.cursor()
        cur.execute("SELECT * FROM einstein")
        rows = cur.fetchall()
        for row in rows:
            print(f"{row[0]} {row[1]}")
    
    
    def update(con, einstein_quotes, page_size):
        cur = con.cursor()
        cur.execute("PREPARE updateStmt AS UPDATE einstein SET msg=$1 WHERE id=$2")
        execute_batch(cur, "EXECUTE updateStmt (%(msg)s, %(id)s)", einstein_quotes, page_size=page_size)
        cur.execute("DEALLOCATE updateStmt")
        con.commit()
    
    
    def main():
        con = None
        einstein_quotes = ({"id": "a", "msg": "Few are those who see with their own eyes and feel with their own hearts."},
                           {"id": "b", "msg": "I have no special talent. I am only passionately curious."},
                           {"id": "c", "msg": "Life is like riding a bicycle. To keep your balance you must keep moving."})
    
        try:
            con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
            print_table(con)
            update(con, einstein_quotes, 100)  #choose some meaningful page_size here
            print("rows updated:")
            print_table(con)
    
        except psycopg2.DatabaseError as e:
    
            print(f'Error {e}')
            sys.exit(1)
    
        finally:
    
            if con:
                con.close()
    
    
    if __name__ == '__main__':
        main()
    

    输出

    上述程序将向调试控制台输出以下内容:
    a     empty
    b     empty
    c     empty
    rows updated:
    a     Few are those who see with their own eyes and feel with their own hearts.
    b     I have no special talent. I am only passionately curious.
    c     Life is like riding a bicycle. To keep your balance you must keep moving.
    

    关于python - 使用 psycopg2 批量更新 Postgres DB 中的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58032991/

    相关文章:

    python - 使用 psycopg2 通过 SSL 连接到 Google Cloud SQL Postgres 数据库

    Python http.server 创建多个实例

    python - 如何使用自定义日志记录处理程序将记录器重定向到 wxPython textCtrl?

    postgresql - MOVE ALL IN "query-cursor_1"自动 sql 指令导致 AWS RDS Postgres 性能问题

    postgresql - 没有函数匹配给定的名称和参数类型(PostgreSQL 10.3 -CentOs 7.4)

    python - 在 Python 中避免 PostgreSQL 数据库中的重复数据

    python - Pandas groupby 并将函数应用于数字列

    Python phonenumber 正则表达式不够好

    java - 如何对数据库中作为二进制数据存储的文件进行索引?

    Python PostgreSQL 语句问题 psycopg2 cursor.execute(Table Union)