python - 为什么我的使用 Queue 、 threading.Thread 和 subprocess 的多线程 python 脚本如此不稳定

标签 python multithreading subprocess

我有三个 shell 脚本 P1 、 P2 和 P3 ,我正在尝试链接它们。这三个 shell 脚本需要串联运行,但在任何给定时间都可以运行多个 P1、P2 和 P3。

我需要在数十个文件上快速运行这些,因此希望使用线程并并行执行操作。

我使用 python Thread 、 Queue 和 subprocess 模块来实现这一点。

我的问题是,当我的线程数大于 1 时,程序行为不稳定,并且线程不会以可重现的方式相互切换。有时所有五个线程都能完美工作并完成工作。

这是我第一次尝试使用线程做某事,我确信这是因为涉及竞争条件的线程的常见问题。但我想知道如何清理我的代码。

实际代码位于(https://github.com/harijay/xtaltools/blob/master/process_multi.py)。下面给出伪代码。如果代码困惑,请见谅。

我的问题是为什么使用这种设计会出现不稳定的行为。线程在任何给定时间都访问不同的文件。 subprocess.call 仅在 shell 脚本完成且其生成的文件写入磁盘时返回。

我可以采取哪些不同的做法? 我试图在这里尽可能简洁地解释我的设计。

我的基本设计:

P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()

class P1_Thread(Thread):
    def __init__(self,P1_Queue,P2_Queue):
        Thread.__init__(self)
        self.in_queue = P1_Queue
        self.out_queue = P2_Queue

    def run(self):
        while True:
            my_file_to_process = self.in_queue.get()
            if my_file_to_process = None:
                break
            P1_runner = P1_Runner(my_file_to_process)
            P1_runner.run_p1_using_subprocess()
            self.out_queue.put(my_file_to_process)

类 p1 Runner 获取输入文件句柄,然后调用 subprocess.call() 来运行使用文件输入的 shell 脚本,并使用 run_p1_using_subprocess 方法生成新的输出文件。

class P1_runner(object):

    def __init__(self,inputfile):
        self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
       self.my_shell_script_file = open("some_unique_p1_file_name.sh")
       os.chmod("some_unique_file_name.sh",0755)

    def run_p1_using_subprocess(self):
        subprocess.call([self.my_shell_script_file])

I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated

The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []

for i in range(THREAD_COUNT):
    p1_worker = P1_Thread(P1_Queue,P2_Queue)
    p1_worker.start()
    p1_worker_list.append(p1_worker)

for worker in p1_worker_list:
    worker.join()

And then again the same code block for p2 and p3

for i in range(THREAD_COUNT):
    p2_worker = P2_Thread(P2_Queue,P3_Queue)
    p2_worker.start()
    p2_worker_list.append(p1_worker)

for worker in p2_worker_list:
    worker.join()

非常感谢您的帮助/建议

最佳答案

这真的很糟糕:

runner.run()

您不应该手动调用线程的 run 方法。您可以使用 .start() 启动一个线程。您的代码一团糟,这里没有人会费力地通过它来查找您的错误。

关于python - 为什么我的使用 Queue 、 threading.Thread 和 subprocess 的多线程 python 脚本如此不稳定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4549889/

相关文章:

python - 附加到 SQLAlchemy 列表时出错

javascript - 如何在 CSV 下表示 JSON 对象

java - 为什么一个后台线程与主服务线程具有相同的线程ID?

iOS 从另一个类/线程更新 UI

python - 调用包含 "echo"和 "|"的命令行

python - 用 Pandas 连接列

python - Pandas - 在日内数据中广播每日数据

java - 可以恢复线程但无法完全停止

Python 子进程在 Windows 上删除 reg key

python-3.x - 在远程服务器上执行 tensorflow python3,子进程无法解释所有 tensorflow 输入 args。导入错误 : No module named 'scripts'