python-3.x - Psycopg2 在类内自动重新连接

标签 python-3.x psycopg2

我有类(class)可以连接到我的数据库。

import psycopg2, psycopg2.extensions
from parseini import config
import pandas as pd, pandas.io.sql as sqlio


class MyDatabase:
    def __init__(self, name='mydb.ini'):
        self.params = config(filename=name)
        self.my_connection = psycopg2.connect(**self.params)
        self.my_cursor = self.my_connection.cursor()

    def fetch_all_as_df(self, sql_statement):
        return sqlio.read_sql_query(sql_statement, self.my_connection)

    def df_to_sql(self, df):
        table = 'sometable'
        return sqlio.to_sql(df, table, self.my_connection)

    def __del__(self):
        self.my_cursor.close()
        self.my_connection.close()

在我的情况下,如何重新连接到数据库并处理 psycopg2.OperationalError ?

最佳答案

您可以制作一个装饰器,在 psycopg2.InterfaceError 时尝试重新连接或 psycopg2.OperationalError被提出。
这只是它如何工作的一个例子,可能需要调整:

import time
from functools import wraps
import psycopg2, psycopg2.extensions


def retry(fn):
    @wraps(fn)
    def wrapper(*args, **kw):
        cls = args[0]
        for x in range(cls._reconnectTries):
            print(x, cls._reconnectTries)
            try:
                return fn(*args, **kw)
            except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
                print ("\nDatabase Connection [InterfaceError or OperationalError]")
                print ("Idle for %s seconds" % (cls._reconnectIdle))
                time.sleep(cls._reconnectIdle)
                cls._connect()
    return wrapper


class MyDatabase:
    _reconnectTries = 5
    _reconnectIdle = 2  # wait seconds before retying

    def __init__(self, name='mydb.ini'):
        self.my_connection = None
        self.my_cursor = None
        self.params = config(filename=name)
        self._connect()

    def _connect(self):
        self.my_connection = psycopg2.connect(**self.params)
        self.my_cursor = self.my_connection.cursor()

    @retry
    def fetch_all_as_df(self, sql_statement):
        return sqlio.read_sql_query(sql_statement, self.my_connection)

    @retry
    def dummy(self):
        self.my_cursor.execute('select 1+2 as result')
        return self.my_cursor.fetchone()

    @retry
    def df_to_sql(self, df):
        table = 'sometable'
        return sqlio.to_sql(df, table, self.my_connection)

    def __del__(self):
        # Maybe there is a connection but no cursor, whatever close silently!
        for c in (self.my_cursor, self.my_connection):
            try:
                c.close()
            except:
                pass


db = MyDatabase()
time.sleep(30)  # some time to shutdown the database
print(db.dummy())

输出:
Database Connection [InterfaceError or OperationalError]
Idle for 2 seconds

Database Connection [InterfaceError or OperationalError]
Idle for 2 seconds

Database Connection [InterfaceError or OperationalError]
Idle for 2 seconds

Database Connection [InterfaceError or OperationalError]
Idle for 2 seconds
(3,)

注:_connect本身没有被修饰,所以这段代码假设一个初始连接 总是 作品!

关于python-3.x - Psycopg2 在类内自动重新连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62094660/

相关文章:

python - 用正则表达式匹配数字序列

python - 为不同的 python 版本准备 Pypi 包

psycopg2 - 插入整数数组

python-3.x - 在线程中使用 psycopg2 游标的正确方法是什么?

python - 如何将答案存储到 numpy 数组中并使用它来创建 Excel 工作表?

python - 如何规范 CSV 中的字典?

python - python3中的array_column相当于什么

django - 复制到 Postgresql : Absolute Path Interpreted as Relative Path

python - 重用 psycopg2.execute 中的参数

python - Psycopg2 使用通配符导致 TypeError