python - SQLAlchemy 和消息消费者 - session 应该放在哪里?

标签 python sqlalchemy message-queue messaging

找不到任何如何与消费者一起使用 Session() 对象的示例。在 SQLAlchemy 文档中,讲述了如何处理 Web 应用程序、守护进程,但没有说明如何处理消费者。 我的代码如下所示:

        engine_1 = create_engine(
            'connection_string',
            pool_pre_ping=True,
            pool_size=5,
            pool_recycle=3600,
            max_overflow=5,
        )

        engine_2 = create_engine(
            'connection_string',
            pool_pre_ping=True,
            pool_size=5,
            pool_recycle=3600,
            max_overflow=5,
        )

我有两个引擎连接两个不同的数据库。 这是我的消费者的代码:

    db_session_1 = sessionmaker(autocommit=False, autoflush=False, bind=engine_1)
    db_session_2 = sessionmaker(autocommit=False, autoflush=False, bind=engine_2)
    msg_processor = MessageProcessor(db_session_1, db_session_2)

    for message in consumer: # <--- consumer it's a KafkaConsumer object. It's a permanent loop
        try:
            msg_processor.process(message)
        except Exception as err:
            logger.error(err)

这是一个 MessageProcessor 类:

class MessageProcessor:
    def __init__(self, session_1, session_2):
        self.user = UserService(session_1)
        self.address = AddressService(session_1, session_2)

    def process(self, message):
        try:
            values = json.loads(message.value)
            self.user.do_something(values)
            self.address.do_something(values)

        except Exception as e:
            logger.error(e)

问题在于消息之间是否存在延迟。一两分钟。由于与数据库的连接丢失,下一条消息将生成错误。

2022-03-17 13:50:13,540 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <pymysql.connections.Connection object at 0x7fd9098486a0> (reason: OperationalError:(2013, 'Lost connection to MySQL server during query'))
2022-03-17 13:50:13,540 DEBUG sqlalchemy.pool.impl.QueuePool Closing connection <pymysql.connections.Connection object at 0x7fd9098486a0>
ERROR:root:(pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: {sql from UserService.do_something(values)}]

而且每次都会发生!看起来来自 SQLAlchemy 的 Session() 对象无法在提交事务之前重新验证与数据库的连接。所以问题是:
我应该如何连接到永久运行并等待来自 Kafka Broker 的消息的进程中的数据库?

最佳答案

我对你的sessionmaker感到困惑用法。通常它像 Session = sessionmaker() 一样使用来获取 session 生成器(或 session 工厂)...然后您可以像 session = Session() 那样创建一个 session 。我没有看到第二步。

我会在每次迭代中使用 session 创建者创建一个单独的 session ,如下所示:


for message in consumer:
    with db_session_1() as session_1, db_session_2() as session_2:
        msg_processor = MessageProcessor(session_1(), session_2())
        try:
            msg_processor.process(message)
        except Exception as err:
            logger.error(err)

这是使用 sqlalchemy 1.4。

关于python - SQLAlchemy 和消息消费者 - session 应该放在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71529499/

相关文章:

python - 无法创建组(没有对文件的写入意图)

postgresql - 在 AWS ec2 实例上通过 sqlalchemy to_sql 过程插入;面临位数据类型列的问题

python - 为什么我的 Django 工厂函数在本地数据库中创建模型,而不是测试数据库?

python - 在 python 中使用 (in) 关键字测试字符串成员身份非常慢

python - 从 BaseQuery 对象获取结果

mysql - SQL:使用类似列表的数据在 col 中搜索匹配元素的有效方法

c# - 批量消费消息——RabbitMQ

iphone - XMPP 是否足够快以支持类似 MMO 的协作?

c++ - posix 管道作为工作队列

python - elasticsearch-py连接池