python-3.x - Python 多线程生产者消费者模式

标签 python-3.x multithreading

我仍在学习如何编码,这是我第一次尝试多线程。 我读过很多多线程文章。我认为这些非常有帮助:

有很多事情需要考虑。特别是对于初学者。 不幸的是,当我尝试将这些信息付诸实践时,我的代码无法正常工作。

此代码背后的想法是读取包含逗号分隔数字行的simplified.txt。例如:0.275,0.28,0.275,0.275,36078。 生产者线程读取每一行并从行尾去除换行符。然后该行中的每个数字被拆分并分配一个变量。 然后将 Variable1 放入队列中。 消费者线程将拾取队列中的项目,将其平方,然后将一个条目添加到日志文件中。

我使用的代码来自this template 。这是我到目前为止的代码:

import threading
import queue
import time
import logging
import random
import sys

read_file = 'C:/temp/temp1/simplified.txt'
log1 = open('C:/temp/temp1/simplified_log1.txt', "a+")

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s',)

BUF_SIZE = 10
q = queue.Queue(BUF_SIZE)

class ProducerThread(threading.Thread):
    def __init__(self, name, read_file):
        super(ProducerThread,self).__init__()
        self.name = name
        self.read_file = read_file

    def run(self, read_file):
        while True:
            if not q.full():
                with open(read_file, 'r') as f:
                    for line in f:
                        stripped = line.strip('\n\r')
                        value1,value2,value3,value4,value5,value6,value7 = stripped.split(',')
                        q.put(value1)
                        logging.debug('Putting ' + str(value1) + ' : ' + str(q.qsize()) + ' items in queue')
                        time.sleep(random.random())
        return

class ConsumerThread(threading.Thread):
    def __init__(self, name, value1, log1):
        super(ConsumerThread,self).__init__()
        self.name = name
        self.value1 = value1
        self.log1 = log1
        return

    def run(self):
        while True:
            if not q.empty():
                value1 = q.get()
                sqr_value1 = value1 * value1
                log1.write("The square of " + str(value1) + " is " + str(sqr_value1))
                logging.debug('Getting ' + str(value1) + ' : ' + str(q.qsize()) + ' items in queue')
                time.sleep(random.random())
        return

if __name__ == '__main__':
    
    p = ProducerThread(name='producer')
    c = ConsumerThread(name='consumer')

    p.start()
    time.sleep(2)
    c.start()
    time.sleep(2)

当我运行代码时,出现此错误:

Traceback (most recent call last):
  File "c:/Scripta/A_Simplified_Producer_Consumer_Queue_v0.1.py", line 60, in <module>
    p = ProducerThread(name='producer')
TypeError: __init__() missing 1 required positional argument: 'read_file'

我不知道还需要在哪里添加read_file。 任何帮助将不胜感激。提前致谢。

最佳答案

您的 ProducerThread 类需要 2 个参数(nameread_file)作为其构造函数的参数,如其 __init__ 中所定义。 code> 方法,当您在主 block 中创建实例时,您仅提供第一个此类参数。你的第二堂课也有同样的问题。

您应该在创建实例时向构造函数提供 read_file ,或者只是将其从构造函数签名中删除,因为您似乎并没有使用它(您使用 read_file code> 传递到 run 函数,但我认为这是不正确的)。似乎您正在尝试从 Thread 父类(super class)重写该方法,并且我怀疑它是否需要这样的参数。

关于python-3.x - Python 多线程生产者消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54489332/

相关文章:

c++ - 合并 "bit"- 线程安全 vector <bool>

c++ - C++ 11 中的锁是否保证访问数据的新鲜度?

c++ - 如何编写第二个线程来监听传入的信号?

python - 'float' 对象不能在 python 中解释为整数

python - 键盘控制 Python

将可变数量的参数插入到字符串中的 Pythonic 操作

java - JPA使用flush来触发异常并停止执行

python - 基于月份列的多个箱线图

python-3.x - "Quota exceeded for quota group ' ReadGroup ' and limit ' 每个用户每 100 秒服务 google api 的读取请求

c# - 新线程无法识别已经创建的主线程单例