找不到任何如何与消费者一起使用 Session()
对象的示例。在 SQLAlchemy 文档中,讲述了如何处理 Web 应用程序、守护进程,但没有说明如何处理消费者。
我的代码如下所示:
engine_1 = create_engine(
'connection_string',
pool_pre_ping=True,
pool_size=5,
pool_recycle=3600,
max_overflow=5,
)
engine_2 = create_engine(
'connection_string',
pool_pre_ping=True,
pool_size=5,
pool_recycle=3600,
max_overflow=5,
)
我有两个引擎连接两个不同的数据库。 这是我的消费者的代码:
db_session_1 = sessionmaker(autocommit=False, autoflush=False, bind=engine_1)
db_session_2 = sessionmaker(autocommit=False, autoflush=False, bind=engine_2)
msg_processor = MessageProcessor(db_session_1, db_session_2)
for message in consumer: # <--- consumer it's a KafkaConsumer object. It's a permanent loop
try:
msg_processor.process(message)
except Exception as err:
logger.error(err)
这是一个 MessageProcessor
类:
class MessageProcessor:
def __init__(self, session_1, session_2):
self.user = UserService(session_1)
self.address = AddressService(session_1, session_2)
def process(self, message):
try:
values = json.loads(message.value)
self.user.do_something(values)
self.address.do_something(values)
except Exception as e:
logger.error(e)
问题在于消息之间是否存在延迟。一两分钟。由于与数据库的连接丢失,下一条消息将生成错误。
2022-03-17 13:50:13,540 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <pymysql.connections.Connection object at 0x7fd9098486a0> (reason: OperationalError:(2013, 'Lost connection to MySQL server during query'))
2022-03-17 13:50:13,540 DEBUG sqlalchemy.pool.impl.QueuePool Closing connection <pymysql.connections.Connection object at 0x7fd9098486a0>
ERROR:root:(pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: {sql from UserService.do_something(values)}]
而且每次都会发生!看起来来自 SQLAlchemy 的 Session()
对象无法在提交事务之前重新验证与数据库的连接。所以问题是:
我应该如何连接到永久运行并等待来自 Kafka Broker 的消息的进程中的数据库?
最佳答案
我对你的sessionmaker感到困惑用法。通常它像 Session = sessionmaker()
一样使用来获取 session 生成器(或 session 工厂)...然后您可以像 session = Session()
那样创建一个 session 。我没有看到第二步。
我会在每次迭代中使用 session 创建者创建一个单独的 session ,如下所示:
for message in consumer:
with db_session_1() as session_1, db_session_2() as session_2:
msg_processor = MessageProcessor(session_1(), session_2())
try:
msg_processor.process(message)
except Exception as err:
logger.error(err)
这是使用 sqlalchemy 1.4。
关于python - SQLAlchemy 和消息消费者 - session 应该放在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71529499/