python - 工作线程内的 Queue.put 失败

标签 python python-3.x multithreading pandas queue

在工作线程内,我正在生成一个数据帧。尝试将其放入传递给工作线程的队列中失败。事实上,尝试将任何值放入队列都会失败。

工作线程 task1() 内失败的代码部分如下:

  df = pd.DataFrame([[1,2,3,4],[3,4,5,6]])
    qmdlvalues.put(df)
    mdltiming = time.time() - start
    qmdlparams.put(paramval)
    qtiming.put(mdltiming)

完整代码

import threading
import queue
from sklearn.manifold import TSNE
import os
import time

def write_tsne_op(opdata,fname,header):
    with open(fname, 'w') as outfile:
        outfile.write(header)

        for data_slice in opdata:           
            np.savetxt(outfile, data_slice,delimiter=",")

def task1(qmdlvalues,qmdlparams,qtiming,paramval):
    start = time.time()
    #tmpmdl1 = TSNE(perplexity=100,early_exaggeration=1, n_components=2,random_state=0,verbose=1)
    #qmdlvalues.put(tmpmdl1.fit_transform(dense_mx))
    df = pd.DataFrame([[1,2,3,4],[3,4,5,6]])
    qmdlvalues.put(df)
    mdltiming = time.time() - start
    qmdlparams.put(paramval)
    qtiming.put(mdltiming)
    print(df)
    print(str(mdltiming))
    print(paramval)

def task2(qmdlvalues,qmdlparams,qtiming,paramval):
    start = time.time()
    #tmpmdl2 = TSNE(perplexity=100,early_exaggeration=10, n_components=2,random_state=0,verbose=1)    
    #qmdlvalues.put(tmpmdl2.fit_transform(dense_mx2))
    qmdlvalues.put(pd.DataFrame([[1,2,3,4],[3,4,5,6]]))
    qmdlparams.put(paramval)
    mdltiming = time.time() - start
    qtiming.put(mdltiming)

if __name__ == "__main__": 


    dense_mx2 = dense_mx
    dense_mx3 = dense_mx

    qmdlvl = queue.Queue()
    qmdlch = queue.Queue()
    qtme   = queue.Queue()
    mdlvalues = pd.DataFrame()

    t1 = threading.Thread(target=task1,args=(qmdlvl,qmdlch,qtme,"#perplex: 100 early exag: 1 timing:$_plex100_exag1.csv"), name='t1')                          
    t2 = threading.Thread(target=task2,args=(qmdlvl,qmdlch,qtme,"#perplex: 100 early exag: 10 timing:$_plex100_exag10.cv"), name='t2')   

    # starting threads    
    t1.start() 
    t2.start() 

    while True:
        if qmdlvl.empty():
            print("Queue closed. Exiting thread.")   
            break
        try:
            item = qmdlvl.get(timeout=.5)

        except:
            continue
            print("Got item:", item)

    # wait until all threads finish 
    t1.join() 
    t2.join() 

下面是我从主代码中获得的实际输出

    while True:
        if qmdlvl.empty():
            print("Queue closed. Exiting thread.")   
            break
        try:
            item = qmdlvl.get(timeout=.5)

        except:
            continue
            print("Got item:", item)

运行主程序的进程ID:6456 主线程名称:MainThread 队列关闭。退出线程。

我希望能够将数据帧放入工作线程内的队列中,并在主线程中访问相同的数据帧。

最佳答案

我之前的代码中存在参数不匹配的情况,这些参数已得到纠正,下面提供了完整的工作代码。

我将 t-SNE 的输出直接存储到队列中,并在主线程中检索相同的输出。下一步是将其转换为线程池和子类化。

import threading
import queue
from sklearn.manifold import TSNE
import os
import time

def write_tsne_op(opdata,fname,header):

    with open(fname, 'w') as outfile:
        outfile.write(header)
        for data_slice in opdata:
            np.savetxt(outfile, data_slice,delimiter=",")

def task1(ip_matrix,qmdlvalues,qmdlparam,plex,exag,qmdltime,qmdlhrfn,hderfname):
    string=""
    start=0
    end=0
    mdltiming=0
    start = time.time()
    tmpmdl1 = TSNE(perplexity=plex,early_exaggeration=exag, n_components=2,random_state=0,verbose=1)
    qmdlvalues.put(tmpmdl1.fit_transform(ip_matrix))    
    string = str(plex)+ "$" + str(exag)
    qmdlparam.put(string)
    qmdlhrfn.put(hderfname)
    end = time.time()
    mdltimig = end - start
    print(str(mdltiming)+"time")
    qmdltime.put(mdltiming)

def task2(ip_matrix,qmdlvalues,qmdlparam,plex,exag,qmdltime,qmdlhrfn,hderfname):
    string=""
    start=0
    end=0
    mdltiming=0
    start = time.time()    
    tmpmdl2 = TSNE(perplexity=plex,early_exaggeration=exag, n_components=2,random_state=0,verbose=1)    
    qmdlvalues.put(tmpmdl2.fit_transform(ip_matrix))
    string = str(plex)+ "$" + str(exag)
    qmdlparam.put(string)
    qmdlhrfn.put(hderfname)
    end = time.time()
    mdltimig = end - start
    qmdltime.put(mdltiming)

def task3(ip_matrix,qmdlvalues,qmdlparam,plex,exag,qmdltime,qmdlhrfn,hderfname):
    string=""
    start=0
    end=0
    mdltiming=0
    start = time.time()    
    tmpmdl3 = TSNE(perplexity=plex,early_exaggeration=exag, n_components=2,random_state=0,verbose=1)    
    qmdlvalues.put(tmpmdl3.fit_transform(ip_matrix))
    string = str(plex)+ "$" + str(exag)
    qmdlparam.put(string)
    qmdlhrfn.put(hderfname)
    end = time.time()
    mdltimig = end - start
    qmdltime.put(mdltiming)

def task4(ip_matrix,qmdlvalues,qmdlparam,plex,exag,qmdltime,qmdlhrfn,hderfname):
    string=""
    start=0
    end=0
    mdltiming=0
    start = time.time()    
    tmpmdl4 = TSNE(perplexity=plex,early_exaggeration=exag, n_components=2,random_state=0,verbose=1)    
    qmdlvalues.put(tmpmdl4.fit_transform(ip_matrix))
    string = str(plex)+ "$" + str(exag)
    qmdlparam.put(string)
    qmdlhrfn.put(hderfname)
    end = time.time()
    mdltimig = end - start
    qmdltime.put(mdltiming)

if __name__ == "__main__": 

    # print ID of current process 
    print("ID of process running main program: {}".format(os.getpid())) 

    # print name of main thread 
    print("Main thread name: {}".format(threading.main_thread().name)) 

    dense_mx2 = dense_mx
    dense_mx3 = dense_mx
    dense_mx4 = dense_mx

    qmdlvl = queue.Queue()
    qmdlch = queue.Queue()
    qmdltme = queue.Queue()
    qmdlhdrfname = queue.Queue()

    perplex = 200

    # creating threads 
    exag=10
    t1 = threading.Thread(target=task1,args=(dense_mx,qmdlvl,qmdlch,perplex,exag,qmdltme,qmdlhdrfname,"#perplex: 200 early exag: 10 timing:$_plex200_exag10.csv"), name='t1')                          

    exag=30
    t2 = threading.Thread(target=task2,args=(dense_mx2,qmdlvl,qmdlch,perplex,exag,qmdltme,qmdlhdrfname,"#perplex: 200 early exag: 30 timing:$_plex200_exag30.cv"), name='t2')   

    exag=50
    t3 = threading.Thread(target=task3,args=(dense_mx3,qmdlvl,qmdlch,perplex,exag,qmdltme,qmdlhdrfname,"#perplex: 200 early exag: 50 timing:$_plex200_exag50.csv"), name='t3')                          

    exag=100
    t4 = threading.Thread(target=task4,args=(dense_mx4,qmdlvl,qmdlch,perplex,exag,qmdltme,qmdlhdrfname,"#perplex: 200 early exag: 100 timing:$_plex200_exag100.cv"), name='t4')   

    # starting threads 
    t1.start() 
    t2.start() 
    t3.start() 
    t4.start() 



    # wait until all threads finish 
    t1.join() 
    t2.join() 
    t3.join() 
    t4.join() 

    while True:
        if qmdlvl.empty():
            print("Queue closed. Exiting thread.")   
            break
        try:
            item1 = qmdlvl.get(timeout=.5)
            item2 = qmdlch.get(timeout=.5)
            item3 = qmdltme.get(timeout=.5)
            header,fname = qmdlhdrfname.get(timeout=.5).split('$')
        except:
            continue        
        write_tsne_op(item1,fname,header)


关于python - 工作线程内的 Queue.put 失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54273401/

相关文章:

c# - .NET 中的线程与进程

python - 斯坦福 NER 未标记日期和时间

python - 如何使用Python Selenium webdriver获取li内span的值?

Python TimedRotatingFileHandler 覆盖日志

python - 如何在 Python 中使用 Cognito 凭据调用 API 网关

c# - 从另一个线程更新 oxyplot 模型

python - Python 中的欧拉计划 #15

Django + PostgreSQL 连接 - 无法使用服务器端游标

python - 简单 pygame 刺激中的对象滞后/传送

c - `pthread_mutex_trylock` 两个线程同时调用时阻塞