python - Thread.getName() 不代表实际的 worker 名称

标签 python mysql multithreading queue

我编写了一个 python 脚本,将市场数据导入到 MariaDB 数据库中。为了加快导入速度,我决定使用模块线程。因此,首先,一个函数用 URL 填充队列,从中下载数据并将其导入到我的数据库中。 不幸的是,导入函数似乎仅由一个线程而不是多个线程处理。

import queue
from threading import Thread

num_threads = 4
threads = []
urls = queue.Queue()

def create_url():    

   ...
   getlist of items
   ...

   for row in item_list:
      url = 'https://someurl=' + str(row[0])
      urls.put(url)

   return urls


def import_mo(urls):
    station_id = 60003760

    print(worker.getName())

    try:
        mariadb_connection = mariadb.connect(allthedbstuff)
        cursor = mariadb_connection.cursor()

        while (True):
            url = urls.get()
            print("%s processes %s queue# %s" % (worker.getName(), url, urls.qsize()))
            if url == None:
                break
            s = requests.Session()
            retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
            s.mount('https://', HTTPAdapter(max_retries=retries))
            jsonraw = s.get(url)
            jsondata = ujson.loads(jsonraw.text)

            for row in jsondata:
                if (row['location_id'] == station_id):
                    cursor.execute(
                        'INSERT INTO tbl_mo_tmp (order_id) VALUES (%s)', (row['order_id'], ))

                cursor.execute('SELECT order_id from tbl_mo WHERE order_id = %s',
                               (row['order_id'], ))
                exists_mo = cursor.fetchall()

                if len(exists_mo) != 0:
                    # print("updating order#", row['order_id'])
                    cursor.execute('UPDATE tbl_mo SET volume = %s, price = %s WHERE order_id = %s',
                                   (row['volume_remain'], row['price'], row['order_id'], ))
                    mariadb_connection.commit()
                else:
                    if (row['location_id'] == station_id):
                        # print("newly inserting order#", row['order_id'])
                        cursor.execute('INSERT INTO tbl_mo (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)',
                                       (row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
                    mariadb_connection.commit()
            urls.task_done()

    except mariadb.Error as error:
        mariadb_connection.rollback()  # rollback if any exception occured

    finally:
        # closing database connection.
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()

def cleanup_mo():
   ...
   do cleanup stuff
   ...

create_url()

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()


for i in range(num_threads):
    urls.put(None)

for worker in threads:
    worker.join()

cleanup_mo()

开头的输出状态:

Thread-1
Thread-2
Thread-3
Thread-4

这表明创建了 4 个单独的工作线程,但是进入 while 循环使得看起来只有一个工作线程实际处理获取的 URL。

Thread-1 processes https://someurl=2 queue# 32
Thread-1 processes https://someurl=3 queue# 31
Thread-1 processes https://someurl=4 queue# 30
Thread-1 processes https://someurl=5 queue# 29
Thread-1 processes https://someurl=6 queue# 28
Thread-1 processes https://someurl=7 queue# 27
Thread-1 processes https://someurl=8 queue# 26
Thread-1 processes https://someurl=9 queue# 25
Thread-1 processes https://someurl=10 queue# 24
Thread-1 processes https://someurl=11 queue# 23
Thread-1 processes https://someurl=12 queue# 22
Thread-1 processes https://someurl=13 queue# 21
Thread-1 processes https://someurl=14 queue# 20
Thread-1 processes https://someurl=15 queue# 19
Thread-1 processes https://someurl=16 queue# 18
Thread-1 processes https://someurl=17 queue# 17
Thread-1 processes https://someurl=18 queue# 16

我希望输出看起来像(理想情况下):

Thread-1 processes https://someurl=2 queue# 32
Thread-2 processes https://someurl=3 queue# 31
Thread-3 processes https://someurl=4 queue# 30
Thread-4 processes https://someurl=5 queue# 29

我在这里缺少什么?

最佳答案

为每个 worker 打印不同的“名称”:

def import_mo(i, urls):
    station_id = 60003760

    print('Worker', i)
    # etc
    # later:
        print("Worker %s processes %s queue# %s" % (i, url, urls.qsize()))

并创建线程:

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(i,urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()

关于python - Thread.getName() 不代表实际的 worker 名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55612078/

相关文章:

python - 为什么我使用 pandas 从中读取 csv 文件的对象是 TextFileReader 对象

c - C编程中的多线程scanf来自线程而pther线程printf

java - postDelayed 有时无法在 Android 中运行

python - 如何获取 ANSI 终端中光标的位置?

Python 将嵌套的 JSON 转换为 CSV

python - 获取 Pandas 数据透视表中另一列的百分比

mysql:如何在LEFT JOIN后保存ORDER BY而不重新排序?

mysql - 如何筛选结果

php - 替换多维数组中的值- php

c++ - boost ublas矩阵的线程安全