python - PySQLPool 和 Celery,正确的使用方法?

标签 python mysql multiprocessing celery connection-pooling

我想知道在 celery 任务中使用 mysql 池的正确方法是什么。 目前,我的任务模块(相关部分)是这样的:

from start import celery
import PySQLPool as pool

dbcfg = config.get_config('inputdb')
input_db = pool.getNewConnection(username=dbcfg['user'], password=dbcfg['passwd'], host=dbcfg['host'], port=dbcfg['port'], db=dbcfg['db'], charset='utf8')

dbcfg = config.get_config('outputdb')
output_db = pool.getNewConnection(username=dbcfg['user'], password=dbcfg['passwd'], host=dbcfg['host'], port=dbcfg['port'], db=dbcfg['db'], charset='utf8')

@celery.task
def fetch():
   ic = pool.getNewQuery(input_db)
   oc = pool.getNewQuery(output_db)

   count = 1
   for e in get_new_stuff():
      # do stuff with new stuff
      # read the db with ic
      # write to db using oc

      # commit from time to time
      if count % 1000:
         pool.commitPool()

   # commit whatever's left
   pool.commitPool()

在一台机器上最多可以同时运行 4 个 fetch() 任务(每个内核 1 个)。 但是,我注意到有时任务会挂起,我怀疑这是由于 mysql 造成的。 有关如何使用 mysql 和 celery 的任何提示?

谢谢!

最佳答案

我也在使用 celery 和 PySQLPool。

maria = PySQLPool.getNewConnection(username=app.config["MYSQL_USER"],
                                   password=app.config["MYSQL_PASSWORD"],
                                   host=app.config["MYSQL_HOST"],
                                   db='configuration')

def myfunc(self, param1, param2):

    query = PySQLPool.getNewQuery(maria, True)
    try:
        sSql = """
            SELECT * FROM table
            WHERE col1= %s AND col2 
            """
        tDatas = ( var1, var2)
        query.Query(sSql, tDatas)
        return query.record
    except Exception, e:
        logger.info(e)
        return False

@celery.task
def fetch():
    myfunc('hello', 'world')

关于python - PySQLPool 和 Celery,正确的使用方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20410342/

相关文章:

python - 如何在 Pyramid 的 URL 中添加语言代码?

c# - 如何使用 Odbc 插入 DateTime 对象?

mysql - PostgreSQL vs MySQL 简单查询性能

sql - 如何选择我的主键?

python - CommandError: Elasticsearch 中未命名任何模型或应用

python - 从一个列表列表中减去另一个列表列表

python - 如何正确使用 tf.nn.max_pool_with_argmax

"Resource temporarily unavailable"后的Python多处理池恢复

python - multiprocessing.Process 的日志输出

python - 属性错误: Can't pickle local object 'computation.。使用多处理队列的 function1