我对 python 多处理比较陌生,并且正在努力解决与该主题相关的许多问题。我最新的问题是多处理、sqlalchemy 和 postgres 的组合。通过这种组合,我有时会得到一个
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac
经过研究,我在文档中发现了这个提示:
https://docs.sqlalchemy.org/en/13/core/pooling.html "It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors, which usually work across process boundaries, meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.
There are two approaches to dealing with this.
The first is, either create a new Engine within the child process, or upon an existing Engine, call Engine.dispose() before the child process uses any connections. This will remove all existing connections from the pool so that it makes all new ones. "
还有这个:
uWSGI, Flask, sqlalchemy, and postgres: SSL error: decryption failed or bad record mac "The issue ended up being uwsgi's forking.
When working with multiple processes with a master process, uwsgi initializes the application in the master process and then copies the application over to each worker process. The problem is if you open a database connection when initializing your application, you then have multiple processes sharing the same connection, which causes the error above."
我的解释是,在使用多处理时,我必须确保每个进程都使用一个新引擎。 在我的子进程中,只有一个类可以读写 postgres-db,所以我决定在类中定义一个 slqalchemy 引擎:
class WS_DB_Booker():
def __init__(self):
engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
Base_inside_class = declarative_base()
Base_inside_class.metadata.create_all(engine_inside_class)
session_factory_inside_class = sessionmaker(bind=engine_inside_class)
self.DBSession_inside_class = scoped_session(session_factory_inside_class)
def example_method_to_read_from_db(self):
try:
sql_alc_session = self.DBSession_inside_class()
sql_alc_session.query(and_so_on....
这在第一次试验中工作正常,没有任何问题。但我不确定这是在类中定义引擎的正确方法,还是会导致任何问题?
最佳答案
您如何 fork 您的流程或哪个组件执行 fork 并不是真正可以理解的。
您需要确保WS_DB_Broker
类在 fork 后实例化!
如果您使用错误的方式(在 fork 之前实例化),那么 Engine
可能已经在它的 Pool
中引用了一些 dbapi
连接.请参阅 SQLAlchemy documentation on working with engines为此。
为了让你的错误更明显,你可以这样做:
import os
class WS_DB_Booker():
def __init__(self):
# Remember the process id from the time of instantiation. If the
# interpreter is forked then the output of `os.getpid()` will change.
self._pid = os.getpid()
engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
Base_inside_class = declarative_base()
Base_inside_class.metadata.create_all(engine_inside_class)
session_factory_inside_class = sessionmaker(bind=engine_inside_class)
self._session = scoped_session(session_factory_inside_class)
def get_session():
if self._pid != os.getpid():
raise RuntimeError("Forked after instantiating! Please fix!")
return self._session()
def example_method_to_read_from_db(self):
try:
sql_alc_session = self.get_session()
# ^^^^^^^^^^^^^
# this may throw RuntimeError when used incorrectly, thus saving you
# from your own mistake.
sql_alc_session.query(and_so_on....
关于python-3.x - Postgres、sqlalchemy 和多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57933533/