python-3.x - Postgres、sqlalchemy 和多处理

标签 python-3.x postgresql sqlalchemy multiprocessing

我对 python 多处理比较陌生,并且正在努力解决与该主题相关的许多问题。我最新的问题是多处理、sqlalchemy 和 postgres 的组合。通过这种组合,我有时会得到一个

 sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac

经过研究,我在文档中发现了这个提示:

https://docs.sqlalchemy.org/en/13/core/pooling.html "It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors, which usually work across process boundaries, meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.

There are two approaches to dealing with this.

The first is, either create a new Engine within the child process, or upon an existing Engine, call Engine.dispose() before the child process uses any connections. This will remove all existing connections from the pool so that it makes all new ones. "

还有这个:

uWSGI, Flask, sqlalchemy, and postgres: SSL error: decryption failed or bad record mac "The issue ended up being uwsgi's forking.

When working with multiple processes with a master process, uwsgi initializes the application in the master process and then copies the application over to each worker process. The problem is if you open a database connection when initializing your application, you then have multiple processes sharing the same connection, which causes the error above."

我的解释是,在使用多处理时,我必须确保每个进程都使用一个新引擎。 在我的子进程中,只有一个类可以读写 postgres-db,所以我决定在类中定义一个 slqalchemy 引擎:

class WS_DB_Booker():
    def __init__(self):

        engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
        Base_inside_class = declarative_base()
        Base_inside_class.metadata.create_all(engine_inside_class)
        session_factory_inside_class = sessionmaker(bind=engine_inside_class)
        self.DBSession_inside_class = scoped_session(session_factory_inside_class)

    def example_method_to_read_from_db(self):
        try:
            sql_alc_session = self.DBSession_inside_class()
            sql_alc_session.query(and_so_on....

这在第一次试验中工作正常,没有任何问题。但我不确定这是在类中定义引擎的正确方法,还是会导致任何问题?

最佳答案

您如何 fork 您的流程或哪个组件执行 fork 并不是真正可以理解的。

您需要确保WS_DB_Broker 类在 fork 后实例化!

如果您使用错误的方式(在 fork 之前实例化),那么 Engine 可能已经在它的 Pool 中引用了一些 dbapi 连接.请参阅 SQLAlchemy documentation on working with engines为此。

为了让你的错误更明显,你可以这样做:

import os

class WS_DB_Booker():
    def __init__(self):
        # Remember the process id from the time of instantiation. If the
        # interpreter is forked then the output of `os.getpid()` will change.
        self._pid = os.getpid()

        engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
        Base_inside_class = declarative_base()
        Base_inside_class.metadata.create_all(engine_inside_class)
        session_factory_inside_class = sessionmaker(bind=engine_inside_class)
        self._session = scoped_session(session_factory_inside_class)

    def get_session():
        if self._pid != os.getpid():
             raise RuntimeError("Forked after instantiating! Please fix!")
        return self._session()

    def example_method_to_read_from_db(self):
        try:
            sql_alc_session = self.get_session()  
            #                      ^^^^^^^^^^^^^
            # this may throw RuntimeError when used incorrectly, thus saving you
            # from your own mistake.
            sql_alc_session.query(and_so_on....

关于python-3.x - Postgres、sqlalchemy 和多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57933533/

相关文章:

python - SQLAlchemy 对于只读查询,如何使用 : session. expire() 与 session.commit()?

python-3.x - 具有可变长度多项式的曲线拟合

python - 遍历Python中的所有文件夹

sql - 触发器postgresql中的距离计算

python - 无法将 PostgreSQL 数据库从 docker 连接到 python

python - sqlalchemy 创建不带命名关键字的插入对象

python - 字典中的分数计算寻求改进

python - 没有名为 PyQt5.sip 的模块

Django-postgres : how can I verify whether the database connection is SSL?

python - 不使用声明性语法时从 SQL Alchemy 结果获取列名