python - 在 Queue.task_done() 之后运行多线程代码

标签 python mysql queue python-multithreading

在经典的“线程/队列”应用程序中。我需要在我的“消费者”函数中进行进一步的计算。队列为空后,urls.task_done() 之后不再执行任何代码。

我正在从 JSON api 导入市场数据并将其导入到我的 MariaDB 数据库中。 在 API 上,我想要获取的每个项目都有一个自己的 url,因此我正在为函数中的所有可用 url 创建一个队列。 “消费者”函数处理队列,根据数据库中已有的数据导入一组新数据或更新现有条目。我已经尝试将实际的 while True 循环包装到它自己的函数中,但它对我不起作用。

def create_url():
    try:
        mariadb_connection = mariadb.connect(host='host
                                             database='db',
                                             user='user',                                             
                                           password='pw')

        cursor = mariadb_connection.cursor()

        cursor.execute('SELECT type_id from tbl_items')
        item_list = cursor.fetchall()
        print("Create URL - Record retrieved successfully")

        for row in item_list:

            url = 'https://someinternet.com/type_id=' + \
                str(row[0])
            urls.put(url)

        return urls

    except mariadb.Error as error:
        mariadb_connection.rollback()  
        print("Failed retrieving itemtypes from tbl_items table 
        {}".format(error))

    finally:
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()

def import(urls):
    list_mo_esi = []
    try:
        mariadb_connection = mariadb.connect(host='host',
                                             database='db',
                                             user='user',
                                             password='pw')

        cursor = mariadb_connection.cursor()

        while True:
            s = requests.Session()
            retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
            s.mount('https://', HTTPAdapter(max_retries=retries))
            jsonraw = s.get(urls.get())
            jsondata = ujson.loads(jsonraw.text)

            for row in jsondata:
                cursor.execute('SELECT order_id from tbl_mo WHERE order_id = %s',
                               (row['order_id'], ))
                exists_mo = cursor.fetchall()
                list_mo_esi.append(row['order_id'])

                if len(exists_mo) != 0:
                    print("updating order#", row['order_id'])
                    cursor.execute('UPDATE tbl_mo SET volume = %s, price = %s WHERE order_id = %s',
                                   (row['volume_remain'], row['price'], row['order_id'], ))
                    mariadb_connection.commit()
                else:
                        cursor.execute('INSERT INTO tbl_mo (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)',
                                       (row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
                        mariadb_connection.commit()

            urls.task_done()

    except mariadb.Error as error:
        mariadb_connection.rollback()  
        print("Failed retrieving itemtypes from tbl_items table {}".format(error))

我的函数的以下finally部分没有执行,但应该执行。

    finally:
        list_mo_purge = list(set(list_mo_sql)-set(list_mo_esi))
        cursor.execute('SELECT order_id FROM tbl_mo')
        list_mo_sql = cursor.fetchall()
        print(len(list_mo_esi))
        print(len(list_mo_sql))

        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()

主线程

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(urls,))
    worker.setDaemon(True)
    worker.start()

create_url()

urls.join()

所有任务完成后,我的工作人员在 urls.task_done() 之后立即停止执行代码。 但是,在函数 urls.task_done() 之后我还有一些代码需要执行以关闭数据库连接并清除旧条目中的数据库。我怎样才能让这个“最后”部分运行?

最佳答案

你还没有摆脱这段时间。

您应该执行以下操作:

if urls.empty():
    break

很可能您的import线程在urls.get()处被阻塞

关于python - 在 Queue.task_done() 之后运行多线程代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55591039/

相关文章:

php - 用MYSQL存储索引列表?

java - 具有批量和刷新功能的生产者/消费者

python - DRF 序列化器深度使字段在创建时被忽略

python - Python 程序,可追溯函数调用或实例化的代码并完美地将其全部打印出来

python - Elasticsearch Python 客户端添加geo_point

php - 分阶段形式和 "going back"(在浏览器中单击 "back")

python - 通过 pandas 访问远程 url 时处理 HTTP 身份验证

mysql - SELECT 中变量分配评估的顺序可能不同于返回行的顺序。在什么情况下会发生这种情况?

python:如何在线程与队列之间共享一个sqlite连接?

javascript - 具有 async/await 风格函数的 async.queue