Python 多处理将命令传递给进程/池/队列?

标签 python python-multiprocessing

不幸的是,我正在尝试同时与大约 14 个树莓派进行通信(艺术项目、燃烧的人、不要问...)并且我正在使用 Paramiko 通过 SSH 连接到 RPi,然后发布各种其他命令:同步文件、启动服务器等...

为此,我一直在使用 python 中的多处理模块,但遇到了错误。连接到各种 RPis 后,我想让 python 脚本挂起并等待输入,例如:启动服务器(传递服务器名称、位置等),这将通过 Paramiko 发送 ssh 命令以开始运行每个 RPI 上的 python 脚本。

我的问题是:如何确保将适当的命令发送到正确的进程/池?例如,如果我实例化连接到各种 RPis 的类,然后发出启动服务器命令,我想: RPi_A 上的服务器用名称 A 初始化, RPi_B 上的服务器用名称 B 初始化, 而不是名为 B 的 RPi_A,等等......

我需要为此使用 process 命令吗?或者游泳池会起作用吗?如果是,则应用、apply_async、map、map_async。不幸的是,文档有点含糊。

示例代码:

import sys
import time
import paramiko
import multiprocessing as mp

login = 'pi'
password = 'raspberry'
serverIp = '192.168.1.143'

config = [
    {'hostname': 'pi1.local', 'name': 'carousel'},
    {'hostname': 'pi2.local', 'name': 'bench'}
]



class Dreamlandia():
    def __init__(self):
        pool = mp.Pool(processes=12)
        results = [pool.apply_async(self.connectToServer, args=(dreamlandObject,)) for dreamlandObject in config]
        output = [p.get() for p in results]

    def connectToServer(self, dreamlandObject):
        host = dreamlandObject['hostname']
        structureName = dreamlandObject['name']
        i = 1
        while True:
            try:
                ssh = paramiko.SSHClient()
                ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                ssh.connect(host, username=login, password=password)
                print ("Connected to " + structureName)
                break
            except paramiko.AuthenticationException:
                print ("Authentication failed when connecting to " + structureName)
                sys.exit(1)
            except:
                print ("Could not SSH to " + structureName + ", waiting for it to start")
                i += 1
                time.sleep(1)

            # If we could not connect within time limit
            if i == 30:
                print ("Could not connect to " + structureName + ". Giving up")
                sys.exit(1)

最佳答案

您的主进程将负责创建、管理和终止您的子进程。

from threading import Thread
from Queue import Queue


def worker(name, queue):
    # connect() #connect to raspPI & stuff ...
    # while True: # in real example this loop ...
        cmd = queue.get()
        print "MSG: thread_%s %s\n" % (name, cmd)
        # execute(cmd) # send command to raspPI
        queue.task_done()

# spawn threads
queues = []
num_threads=4
for i in range(num_threads):
    q = Queue()
    queues.append(q)
    t = Thread(target=worker, args=(i,q))
    t.start()

# send message to every threads
for q in queues:
    q.put("hello", False)

自从我的 workers 结束后,我没有杀死我的子进程。您可能想为您的进程添加一个终止条件,否则您将不得不手动终止它。

关于Python 多处理将命令传递给进程/池/队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30686554/

相关文章:

python - 如何使用pyzmq正确发布和订阅最新消息?

Python 复制带条件的子列表

python - 根据该点的值绘制一个非方阵

Python 的 `urllib2` : Why do I get error 403 when I `urlopen` a Wikipedia page?

python - Flask + SSE : why is time. sleep() 需要吗?

Windows 上的 Python 多处理运行时错误

python脚本在没有线程的情况下并发执行

python - 为 flask 投票应用程序输入选项

python - 如何访问一个类的子类作为属性?

python - 如何从进程或线程实例返回值?